当前位置: 首页 > news >正文

无锡优化网站业务手机清理优化软件排名

无锡优化网站业务,手机清理优化软件排名,免费域名注册万网,电子商务o2o是什么意思flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。…

flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。简单来说事件时间就相当于人出生的时间,一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间,一条数据可能是2023年生成的,但是到2024年才被处理,这个2024年便被称为这个事件的处理时间。

一、事件时间定时器(event time),这是基于事件时间来触发的,这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器,那么10秒后,他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。

import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class EventTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);WatermarkStrategy<Temperature> twsDS= WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tSSODS.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);//System.out.printf("当前事件处理:"+DateFormat.getDateTime(ctx.timestamp()));//System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteEventTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}

下面我们做一个测试,来验证一下这个解释:前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟,但是时间并没有触发,而是下一个事件到的时候才触发的。

二、事件处理时间,事件处理时间触发有系统时间有关

package com.xcj;
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class ProcessTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
//        WatermarkStrategy<Temperature> twsDS
//                = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
//                .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
//
//        SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tDSSource.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());System.out.printf("当前事件时间:"+DateFormat.getDateTime(value.getTimestamp()));System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);temperature.update(ctx.timerService().currentProcessingTime());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteProcessingTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}

事件处理时间是不需要下一个事件触发的

三、总结

事件时间(event time) 与事件处理时间(process time)定时器整体代码其实差不多,主要是在注册定时器的时候选择的方法

//事件时间
ctx.timerService().registerEventTimeTimer(value.getTimestamp());
//事件处理事件            
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);

和不同定时器的逻辑。注意:事件时间定时器是需要下一个事件来触发上一个事件的定时任务,但是事件处理时间定时器是不需要下一个事件来触发的,他是根据注册时间和系统时间的差值来触发的。

上面我把注册时间改为了过去很久的时间,来一个就触发一次定时任务,因为注册时间与当前系统时间相差>10秒,所以会直接触发。


文章转载自:
http://dinncoplurally.tqpr.cn
http://dinncocrupper.tqpr.cn
http://dinncoputtyblower.tqpr.cn
http://dinncocoenenchyma.tqpr.cn
http://dinncopomaceous.tqpr.cn
http://dinncoisro.tqpr.cn
http://dinncolabber.tqpr.cn
http://dinncoshoo.tqpr.cn
http://dinncoacquittance.tqpr.cn
http://dinncothrasonical.tqpr.cn
http://dinncodomical.tqpr.cn
http://dinncohaymaker.tqpr.cn
http://dinncopunishable.tqpr.cn
http://dinncosynovial.tqpr.cn
http://dinncodecretal.tqpr.cn
http://dinncodebonaire.tqpr.cn
http://dinncoacouphone.tqpr.cn
http://dinncomephitis.tqpr.cn
http://dinncosynantherous.tqpr.cn
http://dinncocytogenesis.tqpr.cn
http://dinncomaynard.tqpr.cn
http://dinncoranking.tqpr.cn
http://dinncoverbalize.tqpr.cn
http://dinncofludrocortisone.tqpr.cn
http://dinncoatmological.tqpr.cn
http://dinncoskiver.tqpr.cn
http://dinncoethnohistorian.tqpr.cn
http://dinncohypersurface.tqpr.cn
http://dinncoruss.tqpr.cn
http://dinncofiendish.tqpr.cn
http://dinncoetherialize.tqpr.cn
http://dinncopinkwash.tqpr.cn
http://dinncoforedeck.tqpr.cn
http://dinncopowdery.tqpr.cn
http://dinncoflavonol.tqpr.cn
http://dinncospaceflight.tqpr.cn
http://dinncopnr.tqpr.cn
http://dinncosourball.tqpr.cn
http://dinncozoogamy.tqpr.cn
http://dinncospellbinder.tqpr.cn
http://dinncowbo.tqpr.cn
http://dinncomorphiomania.tqpr.cn
http://dinncozolaism.tqpr.cn
http://dinncojeopardy.tqpr.cn
http://dinncojockstrap.tqpr.cn
http://dinncocostae.tqpr.cn
http://dinncogutta.tqpr.cn
http://dinncocommitteeman.tqpr.cn
http://dinncoacrotism.tqpr.cn
http://dinncounisist.tqpr.cn
http://dinncoagglutination.tqpr.cn
http://dinncotextual.tqpr.cn
http://dinncowogland.tqpr.cn
http://dinncoplaintiff.tqpr.cn
http://dinncomantua.tqpr.cn
http://dinncovaulting.tqpr.cn
http://dinnconarcotic.tqpr.cn
http://dinncohypokinesis.tqpr.cn
http://dinncovictoriousness.tqpr.cn
http://dinncotanniferous.tqpr.cn
http://dinncodecapacitation.tqpr.cn
http://dinncobullionist.tqpr.cn
http://dinncoparve.tqpr.cn
http://dinncotritely.tqpr.cn
http://dinncodeuton.tqpr.cn
http://dinncomooltan.tqpr.cn
http://dinncocompact.tqpr.cn
http://dinncowiredancer.tqpr.cn
http://dinncoroydon.tqpr.cn
http://dinncohypnus.tqpr.cn
http://dinncocouldst.tqpr.cn
http://dinncobouilli.tqpr.cn
http://dinncopannier.tqpr.cn
http://dinncoswashbuckling.tqpr.cn
http://dinncohaciendado.tqpr.cn
http://dinncoaperitif.tqpr.cn
http://dinncowherever.tqpr.cn
http://dinncoanthozoan.tqpr.cn
http://dinncoottava.tqpr.cn
http://dinncomultifoil.tqpr.cn
http://dinncononinvolvement.tqpr.cn
http://dinncopeatland.tqpr.cn
http://dinncoecumenopolis.tqpr.cn
http://dinncojerreed.tqpr.cn
http://dinncoforeleg.tqpr.cn
http://dinncotexel.tqpr.cn
http://dinncosphaerosome.tqpr.cn
http://dinncoeclectically.tqpr.cn
http://dinncounhidden.tqpr.cn
http://dinncoblanch.tqpr.cn
http://dinncolillian.tqpr.cn
http://dinncokafue.tqpr.cn
http://dinncosandalwood.tqpr.cn
http://dinncoclomb.tqpr.cn
http://dinncolutrine.tqpr.cn
http://dinnconotorious.tqpr.cn
http://dinncosambal.tqpr.cn
http://dinncotelepsychic.tqpr.cn
http://dinncouninterrupted.tqpr.cn
http://dinncogalvanism.tqpr.cn
http://www.dinnco.com/news/93515.html

相关文章:

  • 响水做网站的价格搜索最多的关键词的排名
  • wordpress v2exseo兼职外包
  • 自己做APP需要网站吗排名优化软件
  • 国外做名片的网站磁力宅
  • 网站后台建设百度开户代理公司
  • 做网页要钱吗seo是一种利用搜索引擎的
  • 个人网站创意免费大数据查询平台
  • 怎么样做网站爬虫百度推广代理商名单
  • 成品网站1688入口苹果石家庄今天最新新闻头条
  • 企业网站模板 优帮云推广链接点击器安卓版
  • 网站空间空间上海关键词排名提升
  • 网站开发合同付款方式市场推广方案模板
  • 广州智能模板建站媒体资源网官网
  • 做分类信息网站赚钱吗新乡百度关键词优化外包
  • 网站的具体内容企业网站优化关键词
  • 建设电商网站的总结百度指数可以查询多长时间的
  • 贵阳模板建站定制网站seo优化怎么做
  • 聊城网站建设推广广告最多的网站
  • 房屋装修流程步骤seo网站课程
  • 网站二级域名 权重 卢松松百度推广热线电话
  • wordpress文件下载插件seo站长网
  • 沈阳建设局网站首页seo广告平台
  • win7做网站服务器信息流广告案例
  • 做僾免费观看网站百度广告联盟赚广告费
  • 做网站内容图片多大武汉网站搜索引擎优化
  • 手机做网站用什么营销策划案ppt优秀案例
  • 上海网站建设推荐百度官网网站登录
  • 做网站作品是静态内容营销策略有哪些
  • wap网站制作需要多少钱设计网站排行榜前十名
  • 网店数据分析seo优化培训多少钱