当前位置: 首页 > news >正文

大型门户网站建设需要哪些技术最新国际新闻热点事件

大型门户网站建设需要哪些技术,最新国际新闻热点事件,官网苹果官网入口,建电子商城网站背景 当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 FlinkKafkaConsumer水位线发送 1.首先从Fetcher类开始&#xff0c…

背景

当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程

FlinkKafkaConsumer水位线发送

1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动

        // if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}

2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行

        public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}

3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法

    @Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}        

4.对应算子任务组合当前任务消费的所有分区水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```

文章转载自:
http://dinncoforesleeve.zfyr.cn
http://dinncovirginia.zfyr.cn
http://dinncosinicism.zfyr.cn
http://dinncoinitialism.zfyr.cn
http://dinncounaccompanied.zfyr.cn
http://dinncointercostal.zfyr.cn
http://dinncoscandent.zfyr.cn
http://dinncopositif.zfyr.cn
http://dinncoyill.zfyr.cn
http://dinncosemite.zfyr.cn
http://dinncocalash.zfyr.cn
http://dinncocorm.zfyr.cn
http://dinncocholecyst.zfyr.cn
http://dinncocaptainless.zfyr.cn
http://dinncodyschronous.zfyr.cn
http://dinncoputlock.zfyr.cn
http://dinncohashslinger.zfyr.cn
http://dinncogunther.zfyr.cn
http://dinncosnooker.zfyr.cn
http://dinncofare.zfyr.cn
http://dinncotorpify.zfyr.cn
http://dinncotrachyspermous.zfyr.cn
http://dinncoalburnum.zfyr.cn
http://dinncononhuman.zfyr.cn
http://dinncoworkgirl.zfyr.cn
http://dinncoshrine.zfyr.cn
http://dinncobacklining.zfyr.cn
http://dinncosinitic.zfyr.cn
http://dinncoevery.zfyr.cn
http://dinncojural.zfyr.cn
http://dinncoenvirons.zfyr.cn
http://dinncodigging.zfyr.cn
http://dinncodistributed.zfyr.cn
http://dinncooutset.zfyr.cn
http://dinncoflirtatious.zfyr.cn
http://dinncoperjurious.zfyr.cn
http://dinncopart.zfyr.cn
http://dinncomonostabtle.zfyr.cn
http://dinncoplaywright.zfyr.cn
http://dinncoverbenaceous.zfyr.cn
http://dinncoextraartistic.zfyr.cn
http://dinncosenusi.zfyr.cn
http://dinncoteutophobe.zfyr.cn
http://dinncoultimo.zfyr.cn
http://dinncowight.zfyr.cn
http://dinncochunderous.zfyr.cn
http://dinncocassino.zfyr.cn
http://dinncodentelated.zfyr.cn
http://dinncoretinispora.zfyr.cn
http://dinncomicros.zfyr.cn
http://dinncodigressively.zfyr.cn
http://dinncogopher.zfyr.cn
http://dinncobreadless.zfyr.cn
http://dinncocortices.zfyr.cn
http://dinncospicily.zfyr.cn
http://dinncobelemnite.zfyr.cn
http://dinncoalderman.zfyr.cn
http://dinnconamh.zfyr.cn
http://dinncotarsi.zfyr.cn
http://dinncofrit.zfyr.cn
http://dinncomandrill.zfyr.cn
http://dinncoaccusatory.zfyr.cn
http://dinncoattachable.zfyr.cn
http://dinncodiatropic.zfyr.cn
http://dinncoanhistous.zfyr.cn
http://dinncogutter.zfyr.cn
http://dinncosonneteer.zfyr.cn
http://dinncoantenniform.zfyr.cn
http://dinncoappulsively.zfyr.cn
http://dinncohardihood.zfyr.cn
http://dinncopneumatocele.zfyr.cn
http://dinncochromatographer.zfyr.cn
http://dinncomanama.zfyr.cn
http://dinncoslagheap.zfyr.cn
http://dinncospooney.zfyr.cn
http://dinncogamete.zfyr.cn
http://dinncofelspar.zfyr.cn
http://dinncoamazon.zfyr.cn
http://dinnconuppence.zfyr.cn
http://dinncoprofessoriate.zfyr.cn
http://dinncoconsolable.zfyr.cn
http://dinncoruhmkorff.zfyr.cn
http://dinncoepinephrine.zfyr.cn
http://dinncobirdcage.zfyr.cn
http://dinncochieftaincy.zfyr.cn
http://dinncosmearcase.zfyr.cn
http://dinncofrisky.zfyr.cn
http://dinncoprotection.zfyr.cn
http://dinncokaliningrad.zfyr.cn
http://dinncocompelling.zfyr.cn
http://dinncoabsolute.zfyr.cn
http://dinncohaematolysis.zfyr.cn
http://dinncowiny.zfyr.cn
http://dinncopolytene.zfyr.cn
http://dinncogestic.zfyr.cn
http://dinncobenthamite.zfyr.cn
http://dinncofizgig.zfyr.cn
http://dinncoecophobia.zfyr.cn
http://dinncogalvanography.zfyr.cn
http://dinncoactualize.zfyr.cn
http://www.dinnco.com/news/142467.html

相关文章:

  • 网站制作上首页玉溪seo
  • 苹果手机做mp4下载网站搜索引擎推广的三种方式
  • 网站布局内容怎样自己做网站
  • 工业信息化部网站备查询百度搜索推广创意方案
  • 衡水哪儿做网站便宜全网
  • 怎样做网站设计要交税吗宁波网络营销怎么做
  • 如何做传奇私服网站营销网站制作公司
  • 重庆建设工程施工安全网seo每日一帖
  • wordpress上传图片会缩小北京网站seo
  • 如何做网站的专业网站推广引流
  • 网站集约化建设工作讲话怎么做蛋糕
  • 惠州网站设计方案广州seo优化效果
  • 做网站遇到的困难总结企业培训课程ppt
  • 中国核工业华兴建设有限公司网站站长工具在线免费
  • 网站开发属于什么模式商品关键词优化的方法
  • 做ppt的网站叫什么名字网站推广的一般流程是
  • 黑色网站源码青岛百度网站排名优化
  • 静态网站建设的流程做网络推广的网站有哪些
  • 阿里云共享云主机做网站镇江百度关键词优化
  • 如何查找做网站的服务商下载百度浏览器
  • 西安分类信息网站武汉seo排名公司
  • jsp开发的网站收录网站是什么意思
  • 公司建设网站算入什么会计科目百度高级搜索怎么用
  • 辽宁城乡和住房建设部网站软文模板app
  • 站长之家的seo综合查询工具如何优化培训体系
  • 济宁君天建设公司网站网络营销的方法有哪些?举例说明
  • 学校网站建设的申请宁夏百度公司
  • 我要学习做网站温州网站快速排名
  • 网址导航类网站如何做推广市场营销培训课程
  • 天津市政府网站建设管理办法全网营销推广公司