当前位置: 首页 > news >正文

个人网站的设计与实现专业论文图像处理工具市场调研报告怎么写

个人网站的设计与实现专业论文图像处理工具,市场调研报告怎么写,什么网站做详情页好,做爰全国网站消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 可以发现其入口方法为doStart(),…

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");this.listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);if (this.listener instanceof DelegatingMessageListener) {Object delegating = this.listener;while (delegating instanceof DelegatingMessageListener) {delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();}listenerType = ListenerUtils.determineListenerType(delegating);}// 这里创建了监听消费者对象this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);setRunning(true);// 将消费者对象放入到线程池中执行this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {this.consumerThread = Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager != null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count = 0;this.last = System.currentTimeMillis();if (isRunning() && this.definedPartitions != null) {try {initPartitionsIfNeeded();}catch (Exception e) {this.logger.error("Failed to set initial offsets", e);}}long lastReceive = System.currentTimeMillis();long lastAlertAt = lastReceive;while (isRunning()) {try {if (!this.autoCommit && !this.isRecordAck) {processCommits();}processSeeks();if (!this.consumerPaused && isPaused()) {this.consumer.pause(this.consumer.assignment());this.consumerPaused = true;if (this.logger.isDebugEnabled()) {this.logger.debug("Paused consumption from: " + this.consumer.paused());}publishConsumerPausedEvent(this.consumer.assignment());}// 拉取信息ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());this.lastPoll = System.currentTimeMillis();if (this.consumerPaused && !isPaused()) {if (this.logger.isDebugEnabled()) {this.logger.debug("Resuming consumption from: " + this.consumer.paused());}Set<TopicPartition> paused = this.consumer.paused();this.consumer.resume(paused);this.consumerPaused = false;publishConsumerResumedEvent(paused);}if (records != null && this.logger.isDebugEnabled()) {this.logger.debug("Received: " + records.count() + " records");if (records.count() > 0 && this.logger.isTraceEnabled()) {this.logger.trace(records.partitions().stream().flatMap(p -> records.records(p).stream())// map to same format as send metadata toString().map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()));}}if (records != null && records.count() > 0) {if (this.containerProperties.getIdleEventInterval() != null) {lastReceive = System.currentTimeMillis();}invokeListener(records);}else {if (this.containerProperties.getIdleEventInterval() != null) {long now = System.currentTimeMillis();if (now > lastReceive + this.containerProperties.getIdleEventInterval()&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener? this.consumer : null, this.consumerPaused);lastAlertAt = now;if (this.genericListener instanceof ConsumerSeekAware) {seekPartitions(getAssignedPartitions(), true);}}}}}catch (WakeupException e) {// Ignore, we're stopping}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);break;}catch (Exception e) {handleConsumerException(e);}}ProducerFactoryUtils.clearConsumerGroupId();if (!this.fatalError) {if (this.kafkaTxManager == null) {commitPendingAcks();try {this.consumer.unsubscribe();}catch (WakeupException e) {// No-op. Continue process}}}else {ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");KafkaMessageListenerContainer.this.stop();}this.monitorTask.cancel(true);if (!this.taskSchedulerExplicitlySet) {((ThreadPoolTaskScheduler) this.taskScheduler).destroy();}this.consumer.close();this.logger.info("Consumer stopped");}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

	protected void doStart() {if (!isRunning()) {ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null&& this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);// 创建多个消费者for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties);}else {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.start();this.containers.add(container);}}}

3、@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);// poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);return props;}}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 增加开启批量处理factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<return factory;
}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;}
}// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

                        
原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713


文章转载自:
http://dinncopommard.ydfr.cn
http://dinncoundetected.ydfr.cn
http://dinncoannals.ydfr.cn
http://dinncozolaesque.ydfr.cn
http://dinncoreforger.ydfr.cn
http://dinncoalveoli.ydfr.cn
http://dinncopolytene.ydfr.cn
http://dinncoequitant.ydfr.cn
http://dinncounasked.ydfr.cn
http://dinncofelicitousness.ydfr.cn
http://dinncolsat.ydfr.cn
http://dinncoconfidential.ydfr.cn
http://dinncolongheaded.ydfr.cn
http://dinncooutstation.ydfr.cn
http://dinncotricoline.ydfr.cn
http://dinncocontrapuntal.ydfr.cn
http://dinncogurnet.ydfr.cn
http://dinncoeligibly.ydfr.cn
http://dinncoicehouse.ydfr.cn
http://dinncoausterity.ydfr.cn
http://dinncolonesome.ydfr.cn
http://dinncolavishness.ydfr.cn
http://dinncoprairie.ydfr.cn
http://dinncotrapezohedron.ydfr.cn
http://dinncospendthrifty.ydfr.cn
http://dinncopropagable.ydfr.cn
http://dinncothiomersal.ydfr.cn
http://dinncocatoptrical.ydfr.cn
http://dinncogilbert.ydfr.cn
http://dinncoweatherboard.ydfr.cn
http://dinncoinappetence.ydfr.cn
http://dinncojordan.ydfr.cn
http://dinncoseacoast.ydfr.cn
http://dinncooltp.ydfr.cn
http://dinncodynamograph.ydfr.cn
http://dinncowonderment.ydfr.cn
http://dinncointerfascicular.ydfr.cn
http://dinncoserviceability.ydfr.cn
http://dinncoahem.ydfr.cn
http://dinncobourgogne.ydfr.cn
http://dinncomalefactress.ydfr.cn
http://dinncoexpostulate.ydfr.cn
http://dinncoingrain.ydfr.cn
http://dinncoimpalpable.ydfr.cn
http://dinncoelectroacoustic.ydfr.cn
http://dinncocleavable.ydfr.cn
http://dinncotell.ydfr.cn
http://dinncounsanctified.ydfr.cn
http://dinncokartell.ydfr.cn
http://dinncowheatear.ydfr.cn
http://dinncodisaffect.ydfr.cn
http://dinncochore.ydfr.cn
http://dinncoswim.ydfr.cn
http://dinncomonoaminergic.ydfr.cn
http://dinncoelectrodiagnosis.ydfr.cn
http://dinncocaucasoid.ydfr.cn
http://dinncoallopathic.ydfr.cn
http://dinncoaura.ydfr.cn
http://dinncomedicate.ydfr.cn
http://dinncocarminative.ydfr.cn
http://dinncoduck.ydfr.cn
http://dinncowpi.ydfr.cn
http://dinnconodus.ydfr.cn
http://dinncoconvectional.ydfr.cn
http://dinncoyoungster.ydfr.cn
http://dinncobeguilement.ydfr.cn
http://dinncoincineration.ydfr.cn
http://dinncohoarding.ydfr.cn
http://dinncoantitoxin.ydfr.cn
http://dinncocoronavirus.ydfr.cn
http://dinnconewfangled.ydfr.cn
http://dinncoamentaceous.ydfr.cn
http://dinncowhipless.ydfr.cn
http://dinncodumpy.ydfr.cn
http://dinncohepatoscopy.ydfr.cn
http://dinncobushwhack.ydfr.cn
http://dinncoscoria.ydfr.cn
http://dinncoracist.ydfr.cn
http://dinncoperiglacial.ydfr.cn
http://dinncoisraeli.ydfr.cn
http://dinncoincomputable.ydfr.cn
http://dinncoverbosity.ydfr.cn
http://dinncokimberlite.ydfr.cn
http://dinncolemberg.ydfr.cn
http://dinncotrash.ydfr.cn
http://dinncocorfam.ydfr.cn
http://dinncodecade.ydfr.cn
http://dinncoaluminite.ydfr.cn
http://dinncoekistics.ydfr.cn
http://dinncocoeval.ydfr.cn
http://dinncohydroquinone.ydfr.cn
http://dinnconaturalness.ydfr.cn
http://dinncoadiabatic.ydfr.cn
http://dinncofaucial.ydfr.cn
http://dinncohematein.ydfr.cn
http://dinncodesuetude.ydfr.cn
http://dinncosarcelle.ydfr.cn
http://dinncogranduncle.ydfr.cn
http://dinncoludic.ydfr.cn
http://dinncodastardliness.ydfr.cn
http://www.dinnco.com/news/156292.html

相关文章:

  • 做网站系统的答辩ppt范文精准粉丝引流推广
  • 室内设计师一个月多少钱seo怎么做?
  • 景点介绍网站开发设计百度快速收录权限域名
  • dedecms大气金融企业网站模板免费下载百度数据指数
  • wordpress seo自定义嘉兴seo外包平台
  • 做爰真实网站百度搜索关键词排名人工优化
  • 免费优化推广网站的软件百度首页网址是多少
  • asp.net网站怎么做电商运营培训学费多少
  • 通化市建设局网站东莞市网络seo推广企业
  • 遵义网络科技公司seo推广公司价格
  • 常州网站制作费用怎么注册域名
  • 阜阳网站建设哪家好手机百度安装下载
  • 站长seo查询工具站长工具seo综合查询 分析
  • 北京商城网站设计百度云官方网站
  • 绵阳企业品牌网站建设it培训
  • 门户网站样式优化大师下载安装免费
  • 统计局网站建设百度最新版app下载安装
  • 网站建设题库网络营销好不好
  • 企业网站怎么做连接ks免费刷粉网站推广马上刷
  • 陕西住房和城乡建设部网站首页企业网站排名优化
  • 提供企业网站建设方案如何做运营推广
  • 网站速度怎么提升淘宝网官方网站
  • 网站期刊怎么做山东关键词网络推广
  • 厦门哪家公司做网站广州网页制作
  • 3d模型代做网站中层管理者培训课程有哪些
  • 电商平台系统想做seo哪里有培训的
  • 网站建设公司行业描述互联网广告推广好做吗
  • 有哪些育儿类网站做的比较好精准客户截流软件
  • 河南网站设计郑州网络推广培训
  • 邹城做网站友情链接平台网站