当前位置: 首页 > news >正文

银川网站建设有哪些长春seo顾问

银川网站建设有哪些,长春seo顾问,这么自己做网站,公司网站建设中心目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 在Kafka中保存的数…

目录

1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

自定义序列化器

4、分区器

默认分区规则

自定义分区器

5、生产者拦截器

作用

自定义拦截器

6、生产者原理解析


1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息发送前,需要将消息序列化为字节数组进行发送。
  • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定义序列化器

实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serializer方法。

@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一个4字节存储userId的值// 第二个4字节存储username字节数组的长度int值// 第三个length长度,存储username序列化之后的字节数组ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}

4、分区器

默认分区规则

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分区号,则使⽤record提供的分区号
  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

自定义分区器

实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

在生产者参数中通过配置partitioner.class指定自定义分区器。

/*** 自定义分区器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此处可以计算分区的数字。// 我们直接指定为2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

5、生产者拦截器

作用

        在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

自定义拦截器

        自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

        在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器---back");if (exception != null) {// 如果发生异常,记录在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

6、生产者原理解析

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

http://www.dinnco.com/news/71419.html

相关文章:

  • 做网站公司郑州郑州的网站建设公司sem和seo
  • 外贸企业网站策划郑州官网网络营销外包
  • 云南做网站的公司山东免费网络推广工具
  • 求跳转代码来自百度等搜索引擎访问跳转到另一个网站直接输入域名windows优化大师是什么
  • 网站开发需要哪些语言网络营销课程实训报告
  • 阿里巴巴网站开发工具百度搜索引擎入口官网
  • 网站开发合同需要交印花税吗百度在线客服问答
  • 政府网站建设长沙免费网站收录网站推广
  • 如何制作企业的网站郑州seo询搜点网络效果佳
  • 洛阳公司注册宁波seo排名外包
  • 公司做网站的费用记什么科目重庆森林壁纸
  • 苏州做公司网站百度扫一扫
  • 网站友情链接如何做百度推广怎么收费标准
  • 静态网站规范关键词搜索排名工具
  • 西安seo外包行者seo精准网站seo诊断报告
  • 做视频开头动画网站网页设计个人主页
  • 建e网ai渲图插件谷歌优化培训
  • 郑州网站建设网络公司开发网站用什么软件
  • 网站的ftp地址是什么sem竞价培训
  • 深圳网站建设开发公司哪家好企业排名优化公司
  • ebay网站做外贸优缺点推广链接让别人点击
  • 做淘宝要用的网站吗怎么制作网页
  • 阎良网站建设适合女生去的培训机构
  • 佛山网站建设网站建设收费百度官网平台
  • 网站建设公司招聘深圳百度seo优化
  • 做网站刷点击免费b站软件推广网站
  • 做网站包括服务器么宁波seo外包服务
  • 东莞网站建设管理三叶草gw9356
  • 怎样与其它网站做友情链接制作网页的流程
  • 专业做房地产网站建设站长工具中文