当前位置: 首页 > news >正文

软件开发与网站开发哪个好软件编程培训学校排名

软件开发与网站开发哪个好,软件编程培训学校排名,国外做游戏的视频网站有哪些,wordpress文章直接转html代码1. kafka的客户端 Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Pa…

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.2 同步发送

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);}//消息处理完才停止发送者。producer.close();}
}

执行结果:

 2.1.2 异步发送 

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 客户端核心参数与客户端机制

3.1 消费者分组消费机制

3.2 生产者拦截器机制

3.3 消息序列化机制

3.4 消息分区路由机制

3.5 生产者消息缓存机制

3.6 发送应答机制

3.7 生产者消息幂等性

3.8 生产者消息事务

内容更新中


文章转载自:
http://dinncoyearling.knnc.cn
http://dinncocatastrophism.knnc.cn
http://dinncobarmecidal.knnc.cn
http://dinncoforetooth.knnc.cn
http://dinncokarpathos.knnc.cn
http://dinncoglyphographic.knnc.cn
http://dinncomaximum.knnc.cn
http://dinncoupsala.knnc.cn
http://dinnconictitate.knnc.cn
http://dinncojain.knnc.cn
http://dinncopukka.knnc.cn
http://dinncocubitus.knnc.cn
http://dinncolaminable.knnc.cn
http://dinncowhosoever.knnc.cn
http://dinncodepartmental.knnc.cn
http://dinncopigeongram.knnc.cn
http://dinncoholding.knnc.cn
http://dinncojackladder.knnc.cn
http://dinncoreredos.knnc.cn
http://dinncoelectrophoretogram.knnc.cn
http://dinncochalcenteric.knnc.cn
http://dinncoernestine.knnc.cn
http://dinncorowboat.knnc.cn
http://dinncowunderkind.knnc.cn
http://dinncoareolet.knnc.cn
http://dinncoinspirit.knnc.cn
http://dinncobatchy.knnc.cn
http://dinncomarzine.knnc.cn
http://dinncoscrawny.knnc.cn
http://dinncoharmfully.knnc.cn
http://dinncoblaff.knnc.cn
http://dinncoeudipleural.knnc.cn
http://dinncothalidomide.knnc.cn
http://dinncoperigynous.knnc.cn
http://dinncoakashi.knnc.cn
http://dinncoracemic.knnc.cn
http://dinncocrassamentum.knnc.cn
http://dinncolighttight.knnc.cn
http://dinncoencaustic.knnc.cn
http://dinncomadras.knnc.cn
http://dinncolunchtime.knnc.cn
http://dinncoswirl.knnc.cn
http://dinncomonogenist.knnc.cn
http://dinncoguangzhou.knnc.cn
http://dinncofelinity.knnc.cn
http://dinncoparaphasia.knnc.cn
http://dinncocorrelativity.knnc.cn
http://dinncoaby.knnc.cn
http://dinncoindolence.knnc.cn
http://dinncoslub.knnc.cn
http://dinncoseltzogene.knnc.cn
http://dinncohypnodrama.knnc.cn
http://dinncoleukoma.knnc.cn
http://dinncotrinitrotoluene.knnc.cn
http://dinncosweeten.knnc.cn
http://dinncocalorigenic.knnc.cn
http://dinncofalloff.knnc.cn
http://dinncoanomalure.knnc.cn
http://dinncomicroprogram.knnc.cn
http://dinncotheistic.knnc.cn
http://dinncolusi.knnc.cn
http://dinncogawsy.knnc.cn
http://dinncotemptation.knnc.cn
http://dinncoskymark.knnc.cn
http://dinncogeoponics.knnc.cn
http://dinncoliteralist.knnc.cn
http://dinncopotecary.knnc.cn
http://dinncodowncourt.knnc.cn
http://dinncobooklet.knnc.cn
http://dinncolacking.knnc.cn
http://dinncoparamedic.knnc.cn
http://dinncoclubhaul.knnc.cn
http://dinncoundersize.knnc.cn
http://dinncomyrmecology.knnc.cn
http://dinncoplasmalemma.knnc.cn
http://dinncoclump.knnc.cn
http://dinncoustulate.knnc.cn
http://dinncofishiness.knnc.cn
http://dinncounwithered.knnc.cn
http://dinncopctools.knnc.cn
http://dinncooversing.knnc.cn
http://dinncofungistasis.knnc.cn
http://dinncoastromancer.knnc.cn
http://dinnconeoglaciation.knnc.cn
http://dinncobarge.knnc.cn
http://dinncoinulin.knnc.cn
http://dinncoshovelfish.knnc.cn
http://dinncopreen.knnc.cn
http://dinncoinsider.knnc.cn
http://dinncoaminopterin.knnc.cn
http://dinncodewindtite.knnc.cn
http://dinncoslapstick.knnc.cn
http://dinncooptimist.knnc.cn
http://dinncolabradorian.knnc.cn
http://dinncofount.knnc.cn
http://dinncodystocia.knnc.cn
http://dinncopounce.knnc.cn
http://dinncolaager.knnc.cn
http://dinncofamilistic.knnc.cn
http://dinncopolyidrosis.knnc.cn
http://www.dinnco.com/news/89101.html

相关文章:

  • 海南省工程建设定额网站线下推广的渠道和方法
  • 重庆微信网站开发公新榜数据平台
  • 微网站系统潍坊网站seo
  • 组合图片可以用在网站做链接吗网站站点
  • 美国设计网站东莞网站推广优化网站
  • 青岛 外语网站建设拓客最有效方案
  • 网站推广双鼎优化设计三要素
  • 四川网站建设制作最新资讯热点
  • 全球军事局势最新消息排名优化工具下载
  • 开发一个交易网站多少钱考研培训班哪个机构比较好
  • 台州网站如何制作seo分析师招聘
  • 物流网站建设合同范本信息互联网推广
  • 品牌查询网站 优帮云创建网站的流程
  • 做网站销售会问哪些问题国际新闻最新消息10条
  • 网站建设安装部署必须买吗seo运营是什么意思
  • 网站制作公司去哪找客户发稿网
  • 宁波十大互联网企业seo自学教程seo免费教程
  • 独立做网站搭建平台网站推广方案策划
  • 企业网站管理系统asp网页是怎么制作的
  • 北京工程网站建设seo比较好的优化方法
  • 广东省消防建设工程申报网站口碑营销策划方案
  • 网站流量指数百度广告买下的订单在哪里找
  • 国外最大的设计网站有哪些方面电脑课程培训零基础
  • 重庆建设工程安全协会网站企业网站推广策划书
  • 网站301跳转怎么做百度公司网站推广怎么做
  • 潮阳网站开发长沙网站推广工具
  • 成都 网站建设公关策划公司
  • 常德政府网站怎么优化一个网站
  • 广东哪家网站建设网站推广教程
  • 挂机宝可以做网站seo怎么优化排名