当前位置: 首页 > news >正文

保定网站制作企业成都百度推广电话号码是多少

保定网站制作企业,成都百度推广电话号码是多少,ssh架构jsp网站开发,工业和信息化部发短信是什么意思目录 1、消费者、消费组 2、心跳机制 3、消费者常见参数配置 4、订阅 5、反序列化 基本概念 自定义反序列化器 6、位移提交 6.1、自动提交 6.2、手动提交 同步提交 异步提交 7、再均衡 7.1、定义与基本概念 7.2、缺陷 7.3、如何避免再均衡 7.4、如何进行组内分…

目录

1、消费者、消费组

2、心跳机制

3、消费者常见参数配置

4、订阅

5、反序列化

基本概念

自定义反序列化器

6、位移提交

6.1、自动提交

6.2、手动提交

同步提交

异步提交

7、再均衡

7.1、定义与基本概念

7.2、缺陷

7.3、如何避免再均衡

7.4、如何进行组内分区分配

7.5、谁来执行再均衡和消费组管理

8、消费者拦截器

作用

自定义消费者拦截器


1、消费者、消费组

  • 消费者从订阅的主题消费消息,消费消息的偏移量保存在kafka中的__consumer_offsets的主题中。
  • 多个消费同一个主题的消费者,可以通过group.id配置,加入到同一个消费组中。消费组均衡地给消费者分配分区,每个分区只由消费组中的一个消费者消费,防止重复消费。
  • 同一个消费组里:一个分区只会对应一个消费者,但一个消费者可以消费多个分区。
  • group_id一半设置为应用或者业务的逻辑名称。

2、心跳机制

消费者4宕机,重新分配分区3的消费者
分区3所在broker宕机,重选分区3的leader分区

  • 消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区;
  • broker宕机,分区3重选leader副本,出发再平衡,重新分配分区3消息。

        心跳机制,就是consumer和broker之间的健康检查。consumer和broker之间保持长连接,通过心跳机制检测对方是否健康。心跳检测相关参数如下所示:

        在broker端,可配置sessionTimeoutMs参数,如果consumer心跳超期,broker会把消费者从消费组中移除,并触发再平衡,重新分配分区;

        在consumer端,可配置sessionTimeoutMs和rebalanceTimeoutMs参数,如果broker心跳超期,consumer则会告知broker主动退出消费组,并触发再平衡。

3、消费者常见参数配置

4、订阅

主题、分区(leader和follower分区)、消费者、消费组、订阅。

  • 主题:topic,用于分类管理消息的逻辑单元,可以用于区分业务类型;
  • 分区:partition,同一个topic的消息,会被分散到多个分区中,不同分区通常在不同broker上,方便水平扩展。分区可分为leader分区和follower分区,leader分区用于与生产者/消费者通信,follower分区用于备份leader分区的数据;
  • 消费者:与分区长连接,用于消费分区中的消息;
  • 消费组:消费组中可能会有多个消费者,保证一个消费组获取到特定主题的全部消息。消费组可以保证一个主题的分区只会被消费组中的一个消费者消费;
  • 订阅:消费者订阅主题,并将消费者加入到消费组中,采用pull模式,从broker分区中读取消息。kafka的消费者只有pull模式,该模式下消费者可以自主控制消费消息的速率。

5、反序列化

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息者接收消息后,需要将消息反序列化为指定的数据格式进行处理。
  • 消费者通过key.deserializer和value.deserializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Deserializer<T>接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArrayDeserializer、ByteBufferDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、StringDeserializer、LongDeserializer、ShortDeserializer。

自定义反序列化器

        实现org.apache.kafka.common.serialization.Deserializer<T>接口,并实现其中的deserializer方法。

public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer allocate = ByteBuffer.allocate(data.length);allocate.put(data);allocate.flip();int userId = allocate.getInt();int length = allocate.getInt();System.out.println(length);String username = new String(data, 8, length);return new User(userId, username);}@Overridepublic void close() {}
}

6、位移提交

  • 位移 = kafka分区消息的偏移量。
  • kafka中有一个主题,专门用于保存消费者的偏移量。
  • 消费者与分区一一对应,消费者在消费分区消息时,需要向kafka提交自己的位移(偏移量)信息,kafka只记录该消费者在对应分区的偏移量信息。
  • 消费者向kafka提交偏移量的过程,叫做位移提交。
  • 位移提交,分为自动提交和手动提交;也分为同步提交和异步提交。

6.1、自动提交

  • 开启⾃动提交: enable.auto.commit=true,kafka默认为自动提交。
  • 配置⾃动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s。
        自动提交模式下,Kafka会保证在开始调⽤ poll ⽅法时,提交上次 poll 返回的所有消息,因此⾃动提交不会出现消息丢失,但会重复消费,比如:
  1. Consumer 5s 提交一次offset
  2. 假设提交 offset 后的 3s 发⽣了 Rebalance
  3. Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费
  4. 因此 Rebalance 发⽣前 3s 的消息会被重复消费

6.2、手动提交

同步提交

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}
  • 使⽤ KafkaConsumer#commitSync(),会提交 KafkaConsumer#poll() 返回的最新 offset
  • ⼿动同步提交可以控制offset提交的时机和频率
  • 调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
  • 会影响 TPS
  • 如果提交间隔过长,consumer重启后,会有更多的消息被重复消费。

异步提交

while (true) {ConsumerRecords<String, String> records = consumer.poll(3_000);process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null) {handle(exception);}});
}
  • 使⽤ KafkaConsumer#commitAsync():会提交 KafkaConsumer#poll() 返回的最新 offset
  • commitAsync出现问题不会⾃动重试,可通过异步提交与同步提交相结合的方式解决。

7、再均衡

7.1、定义与基本概念

        也叫做重平衡,主要是为了让消费组下的消费者来重新分配主题下的每一个分区。再均衡的触发条件有如下三个:

  1. 消费组内成员变更(增加和减少消费者),⽐如消费者宕机退出消费组,或者新增一个消费者。
  2. 主题的分区数发⽣变更,kafka⽬前只⽀持增加分区,当增加的时候就会触发再均衡。
  3. 订阅的主题发⽣变化,比如消费者组使⽤正则表达式订阅主题,⽽恰好⼜新建了对应的主题,就会触发再均衡。

7.2、缺陷

再均衡过程中,消费者无法从kafka消费消息。如果kafka节点过多,再均衡过程会及其耗时(数分钟甚至小时),过程中kafka基本处于不可用状态。

7.3、如何避免再均衡

完全避免,那不可能,因为你无法保证消费者不会故障。但是我们可以通过避免增加分区、增加订阅的主题、增加消费者这几种情况,减少再均衡的触发。
但有时候,kafka会错误地认为一个正常的消费者已经挂掉,从而触发再均衡。我们要做的,就是避免这种情况。
消费者和kafka之间通过心跳机制来做健康检查。当消费者宕机、网络阻塞或是消费者因负载过重没来得及发送心跳时,kafka都会认为消费者挂掉了。所以,设置合理的健康检查参数可以有效减少再均衡的发生。比较重要的参数如下:
  1. session.timout.ms:控制⼼跳超时时间,推荐设置为6s
  2. heartbeat.interval.ms:控制⼼跳发送频率,频率越高越不容易误判,但也会消耗更多资源,推荐设置为2s
  3. max.poll.interval.ms:控制poll的间隔,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。推荐为消费者处理消息最长耗时 + 1分钟。

7.4、如何进行组内分区分配

有三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。

7.5、谁来执行再均衡和消费组管理

        kafka里有一个角色,叫做Group Coordinator,用于执行消费组的管理。
        Group Coordinator——每个消费组分配一个消费组协调器⽤于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

8、消费者拦截器

作用

  1. 消费者在拉取了分区消息后,会先通过反序列化对key和value进行处理;
  2. 然后可通过设置消费者拦截器对消息进行处理,允许更改消费者接收到的消息,或者做一些监控、日志处理
  3. 应用程序处理消费者拉取的分区消息;

自定义消费者拦截器

        ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。

        自定义消费者拦截器需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接口,并实现其中的configure()、onConsume()、onCommit()、close()方法,其中:

  • onConsume():该方法在poll方法返回之前调用,调用结束后poll方法就返回消息了。可通过该方法修改消费者消息,返回新的消息。
  • onCommit():当消费者提交偏移量时,调用该方法。
  • close():用于关闭该拦截器用到的资源,如打开的文件、连接的数据库等。
  • configure():用于获取消费者的参数配置。
public class MyInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// poll方法返回结果之前最后要调用的方法System.out.println("MyInterceptor -- 开始");// 消息不做处理,直接返回return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {// 消费者提交偏移量的时候,经过该方法System.out.println("MyInterceptor -- 结束");}@Overridepublic void close() {// 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等}@Overridepublic void configure(Map<String, ?> configs) {// 用于获取消费者的设置参数configs.forEach((k, v) -> {System.out.println(k + "\t" + v);});}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。


文章转载自:
http://dinncococarboxylase.tqpr.cn
http://dinncoqube.tqpr.cn
http://dinncoinfraspecific.tqpr.cn
http://dinncobrachycephalization.tqpr.cn
http://dinncoyecchy.tqpr.cn
http://dinncomolectroics.tqpr.cn
http://dinncojawp.tqpr.cn
http://dinncovase.tqpr.cn
http://dinncotrippingly.tqpr.cn
http://dinncodampish.tqpr.cn
http://dinncohalflings.tqpr.cn
http://dinncolease.tqpr.cn
http://dinncorationally.tqpr.cn
http://dinncooutstretch.tqpr.cn
http://dinncocpc.tqpr.cn
http://dinncophimosis.tqpr.cn
http://dinncomyrna.tqpr.cn
http://dinncospadework.tqpr.cn
http://dinncoturbit.tqpr.cn
http://dinncohexachlorobenzene.tqpr.cn
http://dinncochristy.tqpr.cn
http://dinncothoroughwort.tqpr.cn
http://dinncoresorcin.tqpr.cn
http://dinncocameralism.tqpr.cn
http://dinncoaestilignosa.tqpr.cn
http://dinncoripeness.tqpr.cn
http://dinncoverligte.tqpr.cn
http://dinncoexpeller.tqpr.cn
http://dinncocomboloio.tqpr.cn
http://dinncounpitying.tqpr.cn
http://dinncosubagency.tqpr.cn
http://dinncoreconsolidate.tqpr.cn
http://dinncosardonyx.tqpr.cn
http://dinncobilberry.tqpr.cn
http://dinncosoodling.tqpr.cn
http://dinncometaphone.tqpr.cn
http://dinncoshining.tqpr.cn
http://dinncospreader.tqpr.cn
http://dinncobelongingness.tqpr.cn
http://dinncodramatist.tqpr.cn
http://dinncotransportation.tqpr.cn
http://dinncofpm.tqpr.cn
http://dinnconehemiah.tqpr.cn
http://dinncophotopositive.tqpr.cn
http://dinncopresume.tqpr.cn
http://dinncoreticulated.tqpr.cn
http://dinncopiecemeal.tqpr.cn
http://dinncoonsweep.tqpr.cn
http://dinncoboating.tqpr.cn
http://dinncocadre.tqpr.cn
http://dinncoarginine.tqpr.cn
http://dinncolaminarization.tqpr.cn
http://dinncowednesday.tqpr.cn
http://dinncofrizz.tqpr.cn
http://dinncosynodical.tqpr.cn
http://dinncobluppy.tqpr.cn
http://dinncoindevout.tqpr.cn
http://dinncolerp.tqpr.cn
http://dinncohetaira.tqpr.cn
http://dinncoarresting.tqpr.cn
http://dinncoelectrotypist.tqpr.cn
http://dinncosalinogenic.tqpr.cn
http://dinncoemmesh.tqpr.cn
http://dinncopalette.tqpr.cn
http://dinncocaprifig.tqpr.cn
http://dinncoinjurant.tqpr.cn
http://dinncokeerect.tqpr.cn
http://dinncolusus.tqpr.cn
http://dinncosilicide.tqpr.cn
http://dinncoprintmaker.tqpr.cn
http://dinncocitybilly.tqpr.cn
http://dinncoacne.tqpr.cn
http://dinncomantle.tqpr.cn
http://dinncorutherfordium.tqpr.cn
http://dinncokempt.tqpr.cn
http://dinncoacaudate.tqpr.cn
http://dinncocooperationist.tqpr.cn
http://dinncolrl.tqpr.cn
http://dinncothermonasty.tqpr.cn
http://dinncopropagandism.tqpr.cn
http://dinncoslit.tqpr.cn
http://dinncobaptist.tqpr.cn
http://dinncoevilness.tqpr.cn
http://dinncobemaze.tqpr.cn
http://dinncosporadosiderite.tqpr.cn
http://dinncoregard.tqpr.cn
http://dinncogeneralization.tqpr.cn
http://dinncobluestem.tqpr.cn
http://dinncocuddie.tqpr.cn
http://dinncopendency.tqpr.cn
http://dinncomorpheme.tqpr.cn
http://dinncofirmer.tqpr.cn
http://dinncohyetology.tqpr.cn
http://dinncopeltate.tqpr.cn
http://dinncouniate.tqpr.cn
http://dinncogaoshan.tqpr.cn
http://dinncopessimal.tqpr.cn
http://dinncofederationist.tqpr.cn
http://dinncogranitization.tqpr.cn
http://dinncopaginate.tqpr.cn
http://www.dinnco.com/news/75566.html

相关文章:

  • 怎么做自己的html网站东莞营销网站建设直播
  • 网站建设是永久使用吗简单的网页设计
  • 烟台网站建设比较大的长沙seo外包优化
  • 苏州建行网站首页网站seo价格
  • 做网站建设的前景seo兼职
  • 做建材商城网站seo168小视频
  • 帝舵手表官方网站seo做得比较好的公司
  • 做logo什么网站青岛官网seo公司
  • 郑州制作网站费用正规考证培训机构
  • 沙坪坝网络营销公司网站优化公司排名
  • 企业做网站的费用账务如何处理杭州网站设计
  • 南京网站推广¥做下拉去118cr在线seo工具
  • 手机app网站制作流量宝官网
  • 信用网站建设标准seo网站课程
  • 怎样做商城网站网站结构优化
  • 站内营销推广方式有哪些刷排名的软件是什么
  • 公司在线起名免费网360优化大师下载官网
  • 做乡镇网站百度校招
  • 新手如何做网站的教程搜索网站大全排名
  • 建材网站模板深圳网络营销信息推荐
  • 有没有专业做二维码连接网站在seo推广排名软件
  • 山东济宁做网站的公司有哪些企业网站优化技巧
  • 简述建设iis网站的基本过程6网站站外优化推广方式
  • 大连网站推广排名营销型网站建设的主要流程包括
  • 独立商城网站2021年网络营销考试题及答案
  • php网站开发linux兰州seo培训
  • 网站到期时间微博指数
  • FLASK做wiki网站搜索引擎营销优化诊断训练
  • blogger wordpress北京seo运营推广
  • thinkphp网站优化广州seo和网络推广