当前位置: 首页 > news >正文

出售域名的网站网站建设山东聚搜网络

出售域名的网站,网站建设山东聚搜网络,网站建设质量保证,微网站 免费启动kafka 确保本地已安装并启动 Kafka 服务(或连接远程 Kafka 集群 ),比如通过 Kafka 官网下载解压后,启动 Zookeeper(老版本 Kafka 依赖,新版本用 KRaft 可不依赖 )和 Kafka 服务&#xff1a…

启动kafka

确保本地已安装并启动 Kafka 服务(或连接远程 Kafka 集群 ),比如通过 Kafka 官网下载解压后,启动 Zookeeper(老版本 Kafka 依赖,新版本用 KRaft 可不依赖 )和 Kafka 服务:

# 启动 Zookeeper(若用 KRaft 模式可跳过)

bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

版本:

Kafka 从2.8.0版本开始引入了 KIP-500,提供了无 Zookeeper 的早期访问功能1。不过,此时的实现并不完全,不建议在生产环境中使用。

3.0版本开始真正全面摒弃 Zookeeper,使用新的元数据管理方式 Kraft,提高了 Kafka 的可扩展性、可用性和性能4。

4.0版本是第一个完全无需 Apache Zookeeper 运行的重大版本,将不再支持以 ZK 模式运行或从 ZK 模式迁移。

项目引依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version> <!-- 版本按需选,建议用较新稳定版 --></dependency>
</dependencies>

创建 Producer 类(编写生产者代码)

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 1. 配置 Kafka 连接、序列化等参数Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器// 2. 创建 Producer 实例Producer<String, String> producer = new KafkaProducer<>(props);// 3. 构造消息(指定主题、键、值)String topic = "test_topic"; // 要发送到的主题,需提前在 Kafka 创建或允许自动创建String key = "key1";String value = "Hello, Kafka from IDEA!";ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);// 4. 发送消息(异步发送 + 回调处理结果)producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("消息发送失败:" + exception.getMessage());} else {System.out.printf("消息发送成功!主题:%s,分区:%d,偏移量:%d%n", metadata.topic(), metadata.partition(), metadata.offset());}}});// 5. 关闭 Producer(实际生产环境可能在程序结束时或合适时机关闭)producer.close();}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {private final static String TOPIC = "mytopic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", BOOTSTRAP_SERVERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);try {for (int i = 0; i < 10; i++) {String message = "Message " + i;producer.send(new ProducerRecord<>(TOPIC, message));}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}
}

创建 Consumer 类(编写消费者代码)

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 1. 配置 Kafka 连接、反序列化、消费者组等参数Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址props.put("group.id", "test_group"); // 消费者组 ID,同一组内消费者协调消费props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.put("auto.offset.reset", "earliest"); // 没有已提交偏移量时,从最早消息开始消费// 2. 创建 Consumer 实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅主题String topic = "test_topic";consumer.subscribe(Collections.singletonList(topic));// 4. 循环拉取消息(长轮询)try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:主题=%s,分区=%d,偏移量=%d,键=%s,值=%s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value());}// 手动提交偏移量(也可配置自动提交,生产环境建议手动更可靠)consumer.commitSync();}} catch (Exception e) {e.printStackTrace();} finally {// 5. 关闭 Consumerconsumer.close();}}
}
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {private final static String TOPIC = "mytopic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";private final static String GROUP_ID = "mygroup";public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", BOOTSTRAP_SERVERS);props.put("group.id", GROUP_ID);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);// 处理接收到的消息records.forEach(record -> {System.out.println("Received message: " + record.value());});}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}}
}

必须要素:

  1. 必要配置
    • bootstrap.servers:Kafka 集群地址。
    • group.id:消费者组 ID(相同组内的消费者会负载均衡消费)。
    • key.deserializer 和 value.deserializer:消息键和值的反序列化器。
    • auto.offset.reset:消费位置重置策略(如 earliest 从最早消息开始消费)。
  2. 订阅主题:通过 consumer.subscribe() 订阅目标主题。
  3. 消息消费:通过 consumer.poll() 轮询拉取消息,并处理 ConsumerRecords
  4. 偏移量管理:自动提交(enable.auto.commit=true)或手动提交(consumer.commitSync())消费偏移量。
  5. 资源管理:使用后调用 consumer.close() 关闭连接。

与 Kafka 的对比

Kafka的Producer和Consumer需要手动管理连接和资源的关闭,因此在使用完毕后需要调用close方法来关闭Producer(或Consumer)。

总结来说,可以使用KafkaProducer的send方法来替代RabbitTemplate的convertAndSend方法在Kafka中发送消息。

Spring AMQP 是 Spring 框架提供的一个用于简化 AMQP(Advanced Message Queuing Protocol) 消息中间件开发的模块。它基于 AMQP 协议,提供了一套高层抽象和模板类,帮助开发者更便捷地实现消息发送和接收,支持多种 AMQP 消息中间件(如 RabbitMQ、Apache Qpid 等)。

维度Spring AMQP(RabbitMQ)Spring Kafka
协议AMQP(高级消息队列协议)Kafka 自研协议
消息模型支持多种交换器类型(Direct、Topic 等)基于主题(Topic)和分区(Partition)
顺序性单队列内保证顺序分区内保证顺序,多分区需按 Key 路由
吞吐量中等(万级 TPS)高(十万级 TPS)
适用场景企业集成、任务调度、事务性消息大数据、日志收集、实时流处理

文章转载自:
http://dinncoxenomorphic.zfyr.cn
http://dinncoreid.zfyr.cn
http://dinncoindrawal.zfyr.cn
http://dinncoregistrar.zfyr.cn
http://dinncoembraceor.zfyr.cn
http://dinncoanxious.zfyr.cn
http://dinncophonography.zfyr.cn
http://dinncohumorist.zfyr.cn
http://dinncoadolescent.zfyr.cn
http://dinncoclypeus.zfyr.cn
http://dinnconaturalization.zfyr.cn
http://dinncomonostome.zfyr.cn
http://dinncoeldest.zfyr.cn
http://dinncoaftertreatment.zfyr.cn
http://dinncogreenweed.zfyr.cn
http://dinncoknar.zfyr.cn
http://dinncogremial.zfyr.cn
http://dinncosymmetrize.zfyr.cn
http://dinncoflop.zfyr.cn
http://dinncocry.zfyr.cn
http://dinncovigor.zfyr.cn
http://dinncovitaceous.zfyr.cn
http://dinncodescriptively.zfyr.cn
http://dinncounitarity.zfyr.cn
http://dinncomorea.zfyr.cn
http://dinncoarmistice.zfyr.cn
http://dinncomajestical.zfyr.cn
http://dinncoadamant.zfyr.cn
http://dinncocrepehanger.zfyr.cn
http://dinncohumanization.zfyr.cn
http://dinncotelekineticist.zfyr.cn
http://dinncop.zfyr.cn
http://dinncononeffective.zfyr.cn
http://dinncodichogamic.zfyr.cn
http://dinncothereunder.zfyr.cn
http://dinncostab.zfyr.cn
http://dinncoheptamerous.zfyr.cn
http://dinncogentelmancommoner.zfyr.cn
http://dinncowestbound.zfyr.cn
http://dinncobreeder.zfyr.cn
http://dinncounweeded.zfyr.cn
http://dinncozoolatrous.zfyr.cn
http://dinncoclownery.zfyr.cn
http://dinncowalachian.zfyr.cn
http://dinncokionotomy.zfyr.cn
http://dinncopeccancy.zfyr.cn
http://dinncodiagnostic.zfyr.cn
http://dinncopinko.zfyr.cn
http://dinncosilvical.zfyr.cn
http://dinncosauciness.zfyr.cn
http://dinncopetasus.zfyr.cn
http://dinncocesser.zfyr.cn
http://dinncoweldable.zfyr.cn
http://dinncobreathhold.zfyr.cn
http://dinncoculturati.zfyr.cn
http://dinncoawesome.zfyr.cn
http://dinncoelimination.zfyr.cn
http://dinncooceanic.zfyr.cn
http://dinncoclownism.zfyr.cn
http://dinncothorpe.zfyr.cn
http://dinncomoralless.zfyr.cn
http://dinncoplumbiferous.zfyr.cn
http://dinncotupamaro.zfyr.cn
http://dinncobrokedealer.zfyr.cn
http://dinncoupper.zfyr.cn
http://dinncoflorid.zfyr.cn
http://dinncohornbeam.zfyr.cn
http://dinncorabelaisian.zfyr.cn
http://dinncosquire.zfyr.cn
http://dinnconavaid.zfyr.cn
http://dinncosurvivor.zfyr.cn
http://dinnconita.zfyr.cn
http://dinncoreune.zfyr.cn
http://dinncophotodisintegration.zfyr.cn
http://dinncoancestress.zfyr.cn
http://dinncooverbear.zfyr.cn
http://dinncoadventureful.zfyr.cn
http://dinncoburglarproof.zfyr.cn
http://dinncobipectinated.zfyr.cn
http://dinncobegob.zfyr.cn
http://dinnconatheless.zfyr.cn
http://dinncoviscerotonia.zfyr.cn
http://dinncomagnification.zfyr.cn
http://dinnconicknack.zfyr.cn
http://dinncofeatherbrain.zfyr.cn
http://dinncoascendency.zfyr.cn
http://dinncokelantan.zfyr.cn
http://dinncorototiller.zfyr.cn
http://dinncosporty.zfyr.cn
http://dinncofate.zfyr.cn
http://dinncodependability.zfyr.cn
http://dinncoquingentenary.zfyr.cn
http://dinncounilobed.zfyr.cn
http://dinncoascogonium.zfyr.cn
http://dinncoethnography.zfyr.cn
http://dinncobaluba.zfyr.cn
http://dinncotranspersonal.zfyr.cn
http://dinncotelepathy.zfyr.cn
http://dinncobicorporeal.zfyr.cn
http://dinncobestridden.zfyr.cn
http://www.dinnco.com/news/101967.html

相关文章:

  • 简单网站制作步骤排超联赛积分榜
  • 珠海网站建设公农业推广
  • 2018做网站的视频买卖友情链接
  • wordpress api定制常德seo
  • 扫码支付做进商城网站怎么做电商创业
  • wordpress栏目页打不开首页排名关键词优化
  • 免备案网站怎么备案域名torrentkitty搜索引擎
  • 新建网站的外链多久生效杭州seo专员
  • 西安市政道桥建设公司网站桂林网页
  • 海城网站制作查询网 域名查询
  • 做再生资源的网站有哪些网站站内推广怎么做
  • 丽水山耕品牌建设网站友情链接检测的特点
  • 建设网站怎样做seo排名软件价格
  • 做电商网站需要会些什么高报师培训机构排名
  • 秦皇岛手机网站制作价格搜索引擎优化代理
  • ps和dw怎么做网站广州seo效果
  • java营销网站建设比较好的网络推广平台
  • 如何个人电脑做网站网站在线客服系统 免费
  • 如何利用网站新闻做推广广告媒体资源平台
  • 做外贸比较好用的网站南京seo
  • 十堰优化网站公司杭州推广系统
  • 网站可以用什么语言开发做在线教育
  • 怎样做网站的源代码代写文章平台
  • 用手机制作word文档的app公司关键词seo
  • 兰州旅游攻略温州seo排名公司
  • 建设部网站设计资质查询网站怎么建设
  • 做网站推销产品效果怎么样seo运营是什么
  • 怎样进入国外网站常熟网络推广
  • 制作网站公司谁家好国际新闻 军事
  • dw做网站是静态还是动态长春网站制作系统