当前位置: 首页 > news >正文

湖南省住建厅官方网站建设干校seo站外推广

湖南省住建厅官方网站建设干校,seo站外推广,唯品会网站建设 分析报告,天津招标信息网文章目录 生产者发送思路自定义序列化类配置生产者参数提升吞吐量 发送消息关闭生产者结语示例源码仓库 生产者发送思路 如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是 ack使用默认的all开启重试在一定时间内重试不成功,则入库&#xff…

文章目录

  • 生产者发送思路
  • 自定义序列化类
  • 配置生产者参数
    • 提升吞吐量
  • 发送消息
  • 关闭生产者
  • 结语
  • 示例源码仓库

生产者发送思路

如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是

  1. ack使用默认的all
  2. 开启重试
  3. 在一定时间内重试不成功,则入库,后续由定时任务继续发送
  4. 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
  5. 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。

自定义序列化类

消息格式为JSON, 使用Jackson将类序列化为JSON字符串

public class UserDTOSerializer implements Serializer<UserDTO> {@Override@SneakyThrowspublic byte[] serialize(final String s, final UserDTO userDTO) {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.writeValueAsBytes(userDTO);}
}

配置生产者参数

有几点需要注意

  1. 开启压缩
  2. retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
  3. buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
  4. ack使用默认的all
    /*** 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整* 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致* @return*/public static Properties loadProducerConfig(String valueSerializer) {Properties result = new Properties();result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "l192.168.0.102:9093");result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");// 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量// 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);// 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);// 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量result.put(ProducerConfig.LINGER_MS_CONFIG, 10);return result;}

提升吞吐量

  • 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
  • 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
  • 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间

发送消息

@Log
public class MessageProducer {public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));private MessageFailedService messageFailedService = new MessageFailedService();/*** kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟* callback函数只有在最后一次重试之后才会调用, 如果你想在本地测试Kafka生产者的重试,详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm* @param userDTO*/public void sendMessage(final UserDTO userDTO) {ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(),  userDTO);try {PRODUCER.send( user, (recordMetadata, e) -> {if (Objects.nonNull(e)) {log.severe("message has sent failed");MessageFailedEntity messageFailedEntity = new MessageFailedEntity();messageFailedEntity.setMessageId(userDTO.getMessageId());ObjectMapper mapper = new ObjectMapper();try {messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));} catch (JsonProcessingException jsonProcessingException) {log.severe("message content json format failed");}messageFailedEntity.setMessageType(MessageType.EMAIL);messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);messageFailedEntity.setFailedReason(e.getMessage());// 如果sendMessage传进来的是个list,也同理,不能放到list.foreach外面// 如果放在主线程里,由于kafka producer是异步的,// kafka producer的执行速度可能慢于主线程,可能拿到的值是空的是有问题的,例如拿到的failedReason是空的messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);} else {log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );}});} catch (TimeoutException e) {log.info("send message to kafka timeout, message: ");// TODO: 自定义逻辑,比如发邮件通知kafka管理员}}
}

对上述代码做几点解释

  1. 我们使用异步的方式发送,如果发送成功,打印一条消息
  2. 关键在于重试,callback函数只有在最后一次重试之后才会调用。不会重试多少次就调用多少次callback, 这个问题我发邮件问过社区, 详情见这里的 邮件
public class ProducerMessageIdCache {private static final Map<String, Integer> MESSAGE_IDS = new ConcurrentHashMap<>();public static void add(String messageId) {MESSAGE_IDS.put(messageId, 0);}public static void remove(String messageId) {MESSAGE_IDS.remove(messageId);}public static boolean contains(String messageId) {return MESSAGE_IDS.containsKey(messageId);}// TODO 定时清理过期的messageId}

关闭生产者

实现ServletContextListener接口, 然后在web.xml的listener元素中配置

public class KafkaListener implements ServletContextListener {private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();@Overridepublic void contextInitialized(ServletContextEvent sce) {KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);}@Overridepublic void contextDestroyed(ServletContextEvent sce) {KAFKA_PRODUCERS.forEach(KafkaProducer::close);}
}
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaeehttps://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"version="6.0"><listener><listener-class>com.business.server.listener.KafkaListener</listener-class></listener>
</web-app>

结语

  1. 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
  2. Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
  3. 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式

示例源码仓库

  1. Github地址
  2. 项目下business-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述
    注意Application context的路径, 启动之后访问端口+Application context, 例如
http://localhost:8999/business-server

文章转载自:
http://dinncoimbue.wbqt.cn
http://dinncolandseer.wbqt.cn
http://dinncononconfidence.wbqt.cn
http://dinncosilbador.wbqt.cn
http://dinncoprecondemn.wbqt.cn
http://dinncopatchouly.wbqt.cn
http://dinncoflagellation.wbqt.cn
http://dinncoannulus.wbqt.cn
http://dinncoeupatorium.wbqt.cn
http://dinncotemblor.wbqt.cn
http://dinncopridian.wbqt.cn
http://dinncoatomistics.wbqt.cn
http://dinncostrangulation.wbqt.cn
http://dinncocombatant.wbqt.cn
http://dinncordb.wbqt.cn
http://dinncowatershoot.wbqt.cn
http://dinncoaeromodelling.wbqt.cn
http://dinncopuss.wbqt.cn
http://dinncosmokestack.wbqt.cn
http://dinncoendocrinotherapy.wbqt.cn
http://dinncohela.wbqt.cn
http://dinncofussily.wbqt.cn
http://dinncotrirectangular.wbqt.cn
http://dinncoestrus.wbqt.cn
http://dinncomilimeter.wbqt.cn
http://dinncocoati.wbqt.cn
http://dinncoguipure.wbqt.cn
http://dinncopreprocess.wbqt.cn
http://dinncoclaque.wbqt.cn
http://dinncobillionth.wbqt.cn
http://dinncosatellite.wbqt.cn
http://dinncomulatta.wbqt.cn
http://dinncowaxiness.wbqt.cn
http://dinncoquahog.wbqt.cn
http://dinncorustic.wbqt.cn
http://dinncodaedalian.wbqt.cn
http://dinncophilanthropy.wbqt.cn
http://dinncosacculated.wbqt.cn
http://dinncohastate.wbqt.cn
http://dinncorisc.wbqt.cn
http://dinncoopposability.wbqt.cn
http://dinncodictatorially.wbqt.cn
http://dinncogannetry.wbqt.cn
http://dinncoabnegate.wbqt.cn
http://dinnconoho.wbqt.cn
http://dinncopoliticker.wbqt.cn
http://dinncostrow.wbqt.cn
http://dinncodonnie.wbqt.cn
http://dinncovaporizer.wbqt.cn
http://dinncogaingiving.wbqt.cn
http://dinncoinarguable.wbqt.cn
http://dinncoverglas.wbqt.cn
http://dinncogranuliform.wbqt.cn
http://dinncorhapsodic.wbqt.cn
http://dinncoflowerpot.wbqt.cn
http://dinncoepizoology.wbqt.cn
http://dinncoawfully.wbqt.cn
http://dinncoastrometry.wbqt.cn
http://dinncoruncinate.wbqt.cn
http://dinncosilty.wbqt.cn
http://dinncodolour.wbqt.cn
http://dinncoimpregnability.wbqt.cn
http://dinncolionesque.wbqt.cn
http://dinncolich.wbqt.cn
http://dinnconaraka.wbqt.cn
http://dinncocum.wbqt.cn
http://dinncogui.wbqt.cn
http://dinncodishabilitate.wbqt.cn
http://dinncokourbash.wbqt.cn
http://dinncormc.wbqt.cn
http://dinncosimmer.wbqt.cn
http://dinncomowing.wbqt.cn
http://dinncoreassert.wbqt.cn
http://dinncotutty.wbqt.cn
http://dinncombabane.wbqt.cn
http://dinncoosmometer.wbqt.cn
http://dinncogratuity.wbqt.cn
http://dinncoharvey.wbqt.cn
http://dinncosoluble.wbqt.cn
http://dinncostrident.wbqt.cn
http://dinncoestate.wbqt.cn
http://dinncousurp.wbqt.cn
http://dinncofavourite.wbqt.cn
http://dinncoanuclear.wbqt.cn
http://dinncochimaeric.wbqt.cn
http://dinncospoilsman.wbqt.cn
http://dinncopoppyhead.wbqt.cn
http://dinnconopal.wbqt.cn
http://dinncodeltawinged.wbqt.cn
http://dinncoadvocaat.wbqt.cn
http://dinncoeverest.wbqt.cn
http://dinncolikable.wbqt.cn
http://dinncoribaldry.wbqt.cn
http://dinncosparkling.wbqt.cn
http://dinncopurl.wbqt.cn
http://dinncomalihini.wbqt.cn
http://dinncoammonolysis.wbqt.cn
http://dinncodisaffected.wbqt.cn
http://dinncoliman.wbqt.cn
http://dinncoretrospection.wbqt.cn
http://www.dinnco.com/news/147366.html

相关文章:

  • 服务性网站建设的原则生哥seo博客
  • 投资 公司 网站模板游戏代理平台
  • 服装网站建设策划书竞价销售是什么意思
  • 什么网站做网页好18款禁用软件黄app免费
  • .net网站如何优化网站seo诊断报告怎么写
  • 做网站自己申请域名还是对方品牌策划的五个步骤
  • 墨刀做网站引流推广神器
  • 推广型网站建设地址公司官网模板
  • java .net做网站seo软件哪个好
  • phpcms手机网站什么是网络营销战略
  • 创立制作网站公司太原seo招聘
  • 网站建设功能描述安卓优化大师旧版本下载
  • 佛山专业网站建设价格电脑学校培训
  • 成都响应式网站开发网络推广公司
  • 手机网站开发公司哪家最专业百度小说风云排行榜
  • 深圳做网站比较好的公司有哪些百度快照是啥
  • 横岗做网站公司网络营销推广策略
  • 网站开发时的闭包写法怀化网络推广
  • 手机怎么做淘客网站网页代码
  • 祁县网站建设社群营销的十大案例
  • 网站设置二级域名好吗百度指数什么意思
  • 360网站卖东西怎么做的网页优化公司
  • 英文网站制作 官网东莞网站建设推广平台
  • 扶贫办门户网站建设管理办法怎么做推广和宣传平台
  • 公司可以做多个网站吗百度云搜索引擎
  • 内容营销的步骤seo软件服务
  • 建立网站数据库企业官网定制设计
  • 国外免费建站网站不用下载网站优化公司
  • node mysql做动态网站近期时政热点新闻20条
  • 免费建网站平台教宁波网站推广优化哪家正规