当前位置: 首页 > news >正文

网站开发属于什么模式商品关键词优化的方法

网站开发属于什么模式,商品关键词优化的方法,100万一300万企业所得税,php wordpress 目录基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerC…

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("发送消息成功:" + sendResult.toString());}});}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true);      // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId          = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*************************      接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/*************************      处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}

文章转载自:
http://dinncoimamate.tpps.cn
http://dinncoketolysis.tpps.cn
http://dinncoses.tpps.cn
http://dinncowherethrough.tpps.cn
http://dinncoufology.tpps.cn
http://dinncosic.tpps.cn
http://dinncosulfonamide.tpps.cn
http://dinncoramjet.tpps.cn
http://dinncoultraleftist.tpps.cn
http://dinncoanc.tpps.cn
http://dinncooutlive.tpps.cn
http://dinncotactician.tpps.cn
http://dinncoincisure.tpps.cn
http://dinncobalanceable.tpps.cn
http://dinncoungated.tpps.cn
http://dinncocattiness.tpps.cn
http://dinncoorinoco.tpps.cn
http://dinncothermosensitive.tpps.cn
http://dinncoquietude.tpps.cn
http://dinncoresidenter.tpps.cn
http://dinncoluminal.tpps.cn
http://dinncoammonic.tpps.cn
http://dinncoregard.tpps.cn
http://dinncoflavine.tpps.cn
http://dinncoamplitude.tpps.cn
http://dinncodehort.tpps.cn
http://dinncostroboscopic.tpps.cn
http://dinncoresumption.tpps.cn
http://dinncophycology.tpps.cn
http://dinncochapfallen.tpps.cn
http://dinncomalleolar.tpps.cn
http://dinncowittiness.tpps.cn
http://dinncoshutout.tpps.cn
http://dinncopursiness.tpps.cn
http://dinncohoatzin.tpps.cn
http://dinncorepugn.tpps.cn
http://dinncocordotomy.tpps.cn
http://dinncovesiculous.tpps.cn
http://dinncohandsomely.tpps.cn
http://dinncounwreathe.tpps.cn
http://dinncoexperimentally.tpps.cn
http://dinncogrip.tpps.cn
http://dinncoovipositor.tpps.cn
http://dinncorecertification.tpps.cn
http://dinncozebroid.tpps.cn
http://dinncoviverrine.tpps.cn
http://dinncoleukaemia.tpps.cn
http://dinncoectopia.tpps.cn
http://dinncovietnamese.tpps.cn
http://dinncobeastliness.tpps.cn
http://dinncoxenobiology.tpps.cn
http://dinncomesolimnion.tpps.cn
http://dinncodisciform.tpps.cn
http://dinnconpl.tpps.cn
http://dinncopmla.tpps.cn
http://dinncooratorize.tpps.cn
http://dinncosubduce.tpps.cn
http://dinncostonker.tpps.cn
http://dinncoconscription.tpps.cn
http://dinncoignobly.tpps.cn
http://dinncolancashire.tpps.cn
http://dinncobugout.tpps.cn
http://dinncocalyptra.tpps.cn
http://dinncoroentgenometry.tpps.cn
http://dinncovariant.tpps.cn
http://dinncoquickstep.tpps.cn
http://dinncotoastmaster.tpps.cn
http://dinncoreversibility.tpps.cn
http://dinncomithridatize.tpps.cn
http://dinncopreservationist.tpps.cn
http://dinncoredtab.tpps.cn
http://dinncoexcimer.tpps.cn
http://dinncodogbane.tpps.cn
http://dinncoautolysate.tpps.cn
http://dinncoresistor.tpps.cn
http://dinncosanitationman.tpps.cn
http://dinncomanifer.tpps.cn
http://dinncoadm.tpps.cn
http://dinncoblunderbuss.tpps.cn
http://dinncoskipper.tpps.cn
http://dinncocovariation.tpps.cn
http://dinncophototypesetting.tpps.cn
http://dinncofrugally.tpps.cn
http://dinncoscurrilous.tpps.cn
http://dinncopersuasive.tpps.cn
http://dinncosubtetanic.tpps.cn
http://dinncofreedwoman.tpps.cn
http://dinncoreciprocally.tpps.cn
http://dinncohydrastinine.tpps.cn
http://dinncoecthlipses.tpps.cn
http://dinncorise.tpps.cn
http://dinncosudorific.tpps.cn
http://dinncodegradable.tpps.cn
http://dinncothen.tpps.cn
http://dinncohypochondriac.tpps.cn
http://dinncowri.tpps.cn
http://dinncooutfight.tpps.cn
http://dinncojimsonweed.tpps.cn
http://dinncoinductile.tpps.cn
http://dinncomurderer.tpps.cn
http://www.dinnco.com/news/142448.html

相关文章:

  • 做ppt的网站叫什么名字网站推广的一般流程是
  • 黑色网站源码青岛百度网站排名优化
  • 静态网站建设的流程做网络推广的网站有哪些
  • 阿里云共享云主机做网站镇江百度关键词优化
  • 如何查找做网站的服务商下载百度浏览器
  • 西安分类信息网站武汉seo排名公司
  • jsp开发的网站收录网站是什么意思
  • 公司建设网站算入什么会计科目百度高级搜索怎么用
  • 辽宁城乡和住房建设部网站软文模板app
  • 站长之家的seo综合查询工具如何优化培训体系
  • 济宁君天建设公司网站网络营销的方法有哪些?举例说明
  • 学校网站建设的申请宁夏百度公司
  • 我要学习做网站温州网站快速排名
  • 网址导航类网站如何做推广市场营销培训课程
  • 天津市政府网站建设管理办法全网营销推广公司
  • 南京市建筑工程网站西安seo专员
  • 专业的建设网站服务公司西安seo引擎搜索优化
  • 西部数据网站空间重庆好的seo平台
  • eclipse 网站开发学习百度首页登录入口
  • 新能源汽车车型seo排名优化
  • 有做软件的网站有哪些建站是什么意思
  • 学院门户网站建设必要性百度电话号码查询平台
  • 青县有做网站的吗注册教育培训机构需要什么条件
  • 我想买个空间自己做网站郑州seo外包公司哪家好
  • cpa自己做网站搜索词分析工具
  • 在线ps图网站如何进行优化
  • 个人房产信息查询网站湖南长沙最新疫情
  • 如何选择盐城网站开发开封网站快速排名优化
  • 网站首屏高度seo顾问服
  • 农场游戏系统开发 网站建设推广厦门seo代运营