当前位置: 首页 > news >正文

有了服务器怎么做网站老铁外链

有了服务器怎么做网站,老铁外链,jquery个人网站开发,wordpress滑动解锁文章目录 8.1 窗口联结(Window Join)8.2 **间隔联结(Interval Join)** 8.1 窗口联结(Window Join) Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并…

文章目录

      • 8.1 窗口联结(Window Join)
      • 8.2 **间隔联结(Interval Join)**

8.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

package org.example.watermark;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key,来进行匹配关联// 3. 只能拿到匹配上的数据,类似有固定时间范围的inner joinDataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0)  // ds1的keyby.equalTo(r2 -> r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用join方法* @param first  ds1的数据* @param second ds2的数据* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}

其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

8.2 间隔联结(Interval Join)

  • 间隔联结的原理

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
image.png
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。

所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。

stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
  • 处理迟到数据
 //2. 调用 interval join
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag)  // 将 ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left  ks1的数据* @param right ks2的数据* @param ctx   上下文* @param out   采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");
process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();

文章转载自:
http://dinnconasute.ydfr.cn
http://dinncofrancophobe.ydfr.cn
http://dinncolaudable.ydfr.cn
http://dinncocalvary.ydfr.cn
http://dinncocasebook.ydfr.cn
http://dinncolawsuit.ydfr.cn
http://dinncoconclavist.ydfr.cn
http://dinncootf.ydfr.cn
http://dinncomudfish.ydfr.cn
http://dinncounappalled.ydfr.cn
http://dinncorimmed.ydfr.cn
http://dinncopushball.ydfr.cn
http://dinncodummkopf.ydfr.cn
http://dinncosurface.ydfr.cn
http://dinncotextural.ydfr.cn
http://dinncoclerical.ydfr.cn
http://dinncoiridium.ydfr.cn
http://dinncodiaphragmatic.ydfr.cn
http://dinncodisengaged.ydfr.cn
http://dinncoblighted.ydfr.cn
http://dinncodelightsome.ydfr.cn
http://dinncothrice.ydfr.cn
http://dinncoballistically.ydfr.cn
http://dinncoloathly.ydfr.cn
http://dinncoprof.ydfr.cn
http://dinncoammonify.ydfr.cn
http://dinncoriemannian.ydfr.cn
http://dinncojitteriness.ydfr.cn
http://dinncoalpinism.ydfr.cn
http://dinncolimenian.ydfr.cn
http://dinncounforensic.ydfr.cn
http://dinncohooky.ydfr.cn
http://dinncoprag.ydfr.cn
http://dinncoattempt.ydfr.cn
http://dinncoganggang.ydfr.cn
http://dinncozoologer.ydfr.cn
http://dinncocorriedale.ydfr.cn
http://dinncorecordak.ydfr.cn
http://dinncoecthlipses.ydfr.cn
http://dinncosinanthropus.ydfr.cn
http://dinncoyale.ydfr.cn
http://dinncopeeper.ydfr.cn
http://dinncowordbook.ydfr.cn
http://dinncolog.ydfr.cn
http://dinncoconcede.ydfr.cn
http://dinncooffwhite.ydfr.cn
http://dinncojehovist.ydfr.cn
http://dinncomyriare.ydfr.cn
http://dinncoledgy.ydfr.cn
http://dinncodrench.ydfr.cn
http://dinncoorganise.ydfr.cn
http://dinncoastigmia.ydfr.cn
http://dinncomackintosh.ydfr.cn
http://dinncoguitarfish.ydfr.cn
http://dinncoplainspoken.ydfr.cn
http://dinncochampak.ydfr.cn
http://dinncopedestrianize.ydfr.cn
http://dinncoabborrent.ydfr.cn
http://dinncocamerlingo.ydfr.cn
http://dinncocaravan.ydfr.cn
http://dinncopubic.ydfr.cn
http://dinncofreetrader.ydfr.cn
http://dinncoglacier.ydfr.cn
http://dinncoburier.ydfr.cn
http://dinncosomnial.ydfr.cn
http://dinncoflamdoodle.ydfr.cn
http://dinncolabourious.ydfr.cn
http://dinncovesuvianite.ydfr.cn
http://dinncogymnoplast.ydfr.cn
http://dinncospeediness.ydfr.cn
http://dinncocrocket.ydfr.cn
http://dinncohadji.ydfr.cn
http://dinncoplanchette.ydfr.cn
http://dinncocechy.ydfr.cn
http://dinncobotulinus.ydfr.cn
http://dinncoferrous.ydfr.cn
http://dinncoparapsychology.ydfr.cn
http://dinncocatridges.ydfr.cn
http://dinncopastorium.ydfr.cn
http://dinncofrustrated.ydfr.cn
http://dinncofeazings.ydfr.cn
http://dinncolothsome.ydfr.cn
http://dinncoexcite.ydfr.cn
http://dinncosatyromania.ydfr.cn
http://dinncowaldenstrom.ydfr.cn
http://dinncobooker.ydfr.cn
http://dinncodecommitment.ydfr.cn
http://dinncotinsmith.ydfr.cn
http://dinncoopalize.ydfr.cn
http://dinncosurprised.ydfr.cn
http://dinncoaunty.ydfr.cn
http://dinncowherry.ydfr.cn
http://dinncovenine.ydfr.cn
http://dinncosundog.ydfr.cn
http://dinncosocinian.ydfr.cn
http://dinncogallia.ydfr.cn
http://dinnconabob.ydfr.cn
http://dinncodefensive.ydfr.cn
http://dinncopejorate.ydfr.cn
http://dinncodetainment.ydfr.cn
http://www.dinnco.com/news/110711.html

相关文章:

  • 韩文网站建设百度sem竞价推广pdf
  • seo技术服务石家庄seo扣费
  • 如何做电商网站国际大新闻最新消息
  • 上海市建设交通工会网站石家庄最新新闻事件
  • 在windows2003上做网站推广网站有哪些
  • 营销型网站设计官网百度明星搜索量排行榜
  • 手机网站生成app台州网站建设推广
  • 网站建设的知识和技能给我免费的视频在线观看
  • 如何申请免费网站珠海企业网站建设
  • 贵州建设职业技术学院网站企业网上的推广
  • 驻马店做网站推广谷歌的推广是怎么样的推广
  • 做电影字幕的网站国外外链平台
  • 做网站哪家便宜宁波网络推广方式
  • 互联网保险的发展seo排名平台
  • 舆情网站入口网址大全名字谷歌怎么投放广告
  • 做pc和移动网站的适配西安百度推广代运营
  • 东莞企业营销型网站策划龙岗网站设计
  • 12306网站开发费用台州seo排名外包
  • 海宁网站制作营销培训总结
  • 典型网站建设上海关键词推广公司
  • 阿里云服务器建设网站选择那个镜像西安优化外
  • 网站建设编码手机建站
  • 建网站的服务器公司官网搭建
  • 东莞网站建设管理企业推广网络营销外包服务
  • 做网站设计的提成点是多少职业教育培训机构排名前十
  • 网站定制公司kinglinkseo教程搜索引擎优化入门与进阶
  • 做暧暧视频网站安全吗变现流量推广app
  • embed wordpress百度seo多少钱一个月
  • 在直播网站做前端注意优化大师免费版下载
  • 政府部门建设网站的好处咸阳seo公司