当前位置: 首页 > news >正文

扬州市工程信息网seo自学教程seo免费教程

扬州市工程信息网,seo自学教程seo免费教程,做网站交易,小白wordpress必装插件目录 一、消费者(手动提交 offset)的概述1.1、手动提交offset的两种方式1.2、手动提交offset两种方式的区别1.3、手动提交offset的图解 二、消费者(手动提交 offset)的代码示例2.1、手动提交 offset(采用同步提交的方式…

目录

    • 一、消费者(手动提交 offset)的概述
      • 1.1、手动提交offset的两种方式
      • 1.2、手动提交offset两种方式的区别
      • 1.3、手动提交offset的图解
    • 二、消费者(手动提交 offset)的代码示例
      • 2.1、手动提交 offset(采用同步提交的方式)代码
      • 2.1、手动提交 offset(采用异步提交的方式)代码

一、消费者(手动提交 offset)的概述

1.1、手动提交offset的两种方式

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1.2、手动提交offset两种方式的区别

  • 相同点:都会将本次提交的一批数据最高的偏移量提交。
  • 不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

1.3、手动提交offset的图解

在这里插入图片描述

二、消费者(手动提交 offset)的代码示例

2.1、手动提交 offset(采用同步提交的方式)代码

  • 同步提交代码
    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(同步提交)
    kafkaConsumer.commitSync();
    
  • 同步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(同步提交)kafkaConsumer.commitSync();}}
    }
    

2.1、手动提交 offset(采用异步提交的方式)代码

  • 异步提交代码
    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(异步提交)
    kafkaConsumer.commitAsync();
    
  • 异步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(异步提交)kafkaConsumer.commitAsync();}}
    }
    
http://www.dinnco.com/news/3750.html

相关文章:

  • 园区官方网站建设小程序开发软件
  • 珠海网站建设策划网站优化员seo招聘
  • 网站建设专家论证会重庆网站制作系统
  • 信用卡申请网站建设实事新闻热点
  • 服装网站建设比较好厦门seo全网营销
  • 品牌建设规划方案青岛谷歌seo
  • 旅游网站建设的课题研究的主要内容项目网
  • django 做网站 原理网络营销推广培训机构
  • 建设外贸网站公司简介aso优化服务
  • 原子艺术做的网站怎么样子上海网站seo外包
  • 网站开发应该怎么做百度电话号码
  • 中国黄页网廊坊首页霸屏优化
  • 汇泽网站建设交友网站有哪些
  • 网页设计模板html代码五四主题西安关键词优化软件
  • 建设监督网站首页网站模板设计
  • 网页设计搭建网站网络推广员的日常工作
  • 山东定制版网站建设公司网站宣传费用
  • 外包客服平台seo的搜索排名影响因素有哪些
  • 广西建设职业学院官网网站口碑营销的前提及好处有哪些?
  • java产品展示网站源码国内免费推广产品的网站
  • 石家庄网站开发培训菏泽地网站seo
  • 58同城建网站怎么做新网域名注册
  • 用ipad写wordpressseo狂人
  • 企业网站功能对比分析迅速上排名网站优化
  • 深圳网站开发电话百度app下载官方免费最新版
  • 青岛商城网站建设设计百度贴吧网页版登录
  • 公司网站开发详细流程百度手机助手下载2022新版
  • wordpress域名绑定费用青岛网站优化公司
  • wordpress自定义分享免费推广seo
  • 如何利用网站策划做好网站建设怎么有自己的网站