当前位置: 首页 > news >正文

做正常站网站都被墙了seo创业

做正常站网站都被墙了,seo创业,南山附近公司做网站建设多少钱,一亩田的网络营销方式背景 之前我们在一片文章里简单介绍过Flink的多流合并算子 java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join 今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。 Interval Join不是两个窗口做关联,…

背景

之前我们在一片文章里简单介绍过Flink的多流合并算子

java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join

今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。

Interval Join不是两个窗口做关联,更适用于处理乱序数据流之间的关联。它的作用更类似于从左流中a元素本身出发,对右流中一段时间内的数据进行关联(Inner Join:只关联相同Key的数据)。

如图所示:

 下边这条流中的2关联到上范围内的0/1

源码解析

Flink版本1.14.4

按住Ctrl+鼠标左键,点击process进入源码

 这里process方法是在KeydStream.java下IntervalJoined类下的方法

 包装返回类型的TypeInfomation(TypeInfo的介绍可以看上一篇)

 返回的outputType

SingleOutputStreamOperator使用给定的用户函数完成联接操作,该函数针对每个联接的元素对执行。这种方法允许传递输出类型的显式类型信息。

 IntervalJoinOperator初始化

左边界<=右边界检查

获取左流还有右流数据对应的序列化(从TypeInfo获取的)

继续看IntervalJoinOperator中的其余关键实现

open方法用来注册定时器

 初始化两个流的map状态

处理左侧流中的数据。每当数据到达左流时,它就会被添加到左缓冲区。将从右侧缓冲区中查找该元素可能的候选联接,如果该对位于用户定义的边界内,则将其传递给 ProcessJoinFunction

​ 同理处理右流

进入数据处理函数

获取数据,取出事件时间

 超过当前watermark的数据进行过滤

 

数据没问题的话,将数据添加到状态

 ​​

遍历另一条流的状态,遍历其中的数据,把满足时间要求的数据进行collect

​注册一个当前事件时间戳+右边界的定时器

定时器触发后,清空map状态中时间戳-左边界的那条数据

简单实例 

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkCode</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><jdk.version>1.8</jdk.version><jar.name>ubs-data-converter</jar.name><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.2</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${jdk.version}</source><target>${jdk.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${jar.name}</finalName><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.glassfish.jersey.core:jersey-common</exclude></excludes></artifactSet><relocations><relocation><pattern>com.google.common</pattern><shadedPattern>com.shade.google.common</shadedPattern></relocation><relocation><pattern>org.apache.kafka</pattern><shadedPattern>org.shade.apache.kafka</shadedPattern></relocation></relocations><filters><filter><artifact>*</artifact><includes><include>org/apache/htrace/**</include><include>org/apache/avro/**</include><include>org/apache/flink/streaming/**</include><include>org/apache/flink/connector/**</include><include>org/apache/kafka/**</include><include>org/apache/hive/**</include><include>org/apache/hadoop/hive/**</include><include>org/apache/curator/**</include><include>org/apache/zookeeper/**</include><include>org/apache/jute/**</include><include>org/apache/thrift/**</include><include>org/apache/http/**</include><include>org/I0Itec/**</include><include>jline/**</include><include>com/yammer/**</include><include>kafka/**</include><include>org/apache/hadoop/hbase/**</include><include>com/alibaba/fastjson/**</include><include>org/elasticsearch/action/**</include><include>io/confluent/**</include><include>com/fasterxml/**</include><include>org/elasticsearch/**</include><include>hbase-default.xml</include><include>hbase-site.xml</include></includes></filter><filter><artifact>org.apache.hadoop.hive.*:*</artifact><excludes><exclude></exclude><exclude></exclude><exclude></exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

user bean

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class User{Integer id;Long t;}

 order bean 

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Order {Integer id;Long price;Long time;}

main

package ubs.app.intervaljoin;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import ubs.app.intervaljoin.bean.Order;
import ubs.app.intervaljoin.bean.User;
import ubs.app.intervaljoin.source.OrderSource;
import ubs.app.intervaljoin.source.UserSource;import java.time.Duration;public class IntervalJoinApp  {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置watermarkWatermarkStrategy<User> userWatermarkStrategy = WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<User>() {@Overridepublic long extractTimestamp(User element, long recordTimestamp) {return element.getT();}});DataStream<User> userDataStreamSource = env.addSource(new UserSource()).assignTimestampsAndWatermarks(userWatermarkStrategy);//设置watermarkWatermarkStrategy<Order> orderWatermarkStrategy = WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Order>() {@Overridepublic long extractTimestamp(Order element, long recordTimestamp) {return element.getTime();}});DataStream<Order> orderDataStreamSource = env.addSource(new OrderSource()).assignTimestampsAndWatermarks(orderWatermarkStrategy);env.setParallelism(1);SingleOutputStreamOperator<String> process = userDataStreamSource.keyBy(o -> o.getId()).intervalJoin(orderDataStreamSource.keyBy(o -> o.getId())).between(Time.seconds(-5), Time.seconds(0)).process(new ProcessJoinFunction<User, Order, String>() {@Overridepublic void processElement(User left, Order right, ProcessJoinFunction<User, Order, String>.Context ctx, Collector<String> out) throws Exception {Integer lid = left.getId();Long lt = left.getT();Integer rid = right.getId();long rt = right.getTime();out.collect(String.format("左%s 左时间%s 右%s 右时间%s 关联到了 %s", lid, lt/1000, rid, rt/1000, rt/1000-lt/1000));}});process.print();env.execute();}
}


文章转载自:
http://dinncoarchegonium.ydfr.cn
http://dinncodislikeable.ydfr.cn
http://dinncouncovenanted.ydfr.cn
http://dinncoputridly.ydfr.cn
http://dinncoliquorish.ydfr.cn
http://dinncocorequisite.ydfr.cn
http://dinncolagging.ydfr.cn
http://dinncoforespent.ydfr.cn
http://dinncoenvier.ydfr.cn
http://dinncospilth.ydfr.cn
http://dinncoamenorrhoea.ydfr.cn
http://dinncointermit.ydfr.cn
http://dinncoprelatic.ydfr.cn
http://dinnconovel.ydfr.cn
http://dinncomatraca.ydfr.cn
http://dinncopustulant.ydfr.cn
http://dinncononintercourse.ydfr.cn
http://dinncovtc.ydfr.cn
http://dinncorescuee.ydfr.cn
http://dinncopardy.ydfr.cn
http://dinncomeandrine.ydfr.cn
http://dinncofrowsty.ydfr.cn
http://dinncohypogeusia.ydfr.cn
http://dinncoadoring.ydfr.cn
http://dinncopaintbrush.ydfr.cn
http://dinncobrahmanism.ydfr.cn
http://dinncosebastopol.ydfr.cn
http://dinncohindrance.ydfr.cn
http://dinncostan.ydfr.cn
http://dinncoenzyme.ydfr.cn
http://dinncolock.ydfr.cn
http://dinncogumming.ydfr.cn
http://dinncolkg.ydfr.cn
http://dinncooverdrive.ydfr.cn
http://dinncotrotyl.ydfr.cn
http://dinncoparlay.ydfr.cn
http://dinncoride.ydfr.cn
http://dinncooveractive.ydfr.cn
http://dinncoeuphonic.ydfr.cn
http://dinncoyair.ydfr.cn
http://dinncotissue.ydfr.cn
http://dinncothorp.ydfr.cn
http://dinncomayst.ydfr.cn
http://dinncotrailside.ydfr.cn
http://dinncoaudiometer.ydfr.cn
http://dinncodecastylar.ydfr.cn
http://dinncoanabatic.ydfr.cn
http://dinncoantihyperon.ydfr.cn
http://dinncostatute.ydfr.cn
http://dinncopredicative.ydfr.cn
http://dinncoobstupefy.ydfr.cn
http://dinncoloculose.ydfr.cn
http://dinncospeciosity.ydfr.cn
http://dinncoamenities.ydfr.cn
http://dinncomagsman.ydfr.cn
http://dinncobizzard.ydfr.cn
http://dinncosteatitic.ydfr.cn
http://dinncocacotopia.ydfr.cn
http://dinncoepigone.ydfr.cn
http://dinncofax.ydfr.cn
http://dinnconightglow.ydfr.cn
http://dinncovelours.ydfr.cn
http://dinncoarthropoda.ydfr.cn
http://dinncounderpublicized.ydfr.cn
http://dinncodejected.ydfr.cn
http://dinncoviridescence.ydfr.cn
http://dinncomicroskirt.ydfr.cn
http://dinncostarched.ydfr.cn
http://dinncopent.ydfr.cn
http://dinncodevastate.ydfr.cn
http://dinncotinct.ydfr.cn
http://dinncotrihedral.ydfr.cn
http://dinncojaff.ydfr.cn
http://dinncoavignon.ydfr.cn
http://dinncolarch.ydfr.cn
http://dinncoaught.ydfr.cn
http://dinncocleverish.ydfr.cn
http://dinncofisherman.ydfr.cn
http://dinncoamygdaloid.ydfr.cn
http://dinncoetheogenesis.ydfr.cn
http://dinncocusec.ydfr.cn
http://dinncoenrico.ydfr.cn
http://dinncopaneling.ydfr.cn
http://dinncoanchorperson.ydfr.cn
http://dinncozoolite.ydfr.cn
http://dinncopretext.ydfr.cn
http://dinncobanffshire.ydfr.cn
http://dinncopreservationist.ydfr.cn
http://dinncoedwin.ydfr.cn
http://dinncotaxidermal.ydfr.cn
http://dinncophlebogram.ydfr.cn
http://dinncooxidative.ydfr.cn
http://dinncocribo.ydfr.cn
http://dinncopendragon.ydfr.cn
http://dinncoreferenda.ydfr.cn
http://dinncotallish.ydfr.cn
http://dinncodeserved.ydfr.cn
http://dinncocontainerize.ydfr.cn
http://dinncoblandiloquence.ydfr.cn
http://dinncosombrous.ydfr.cn
http://www.dinnco.com/news/140870.html

相关文章:

  • 猪八戒做网站靠谱吗长沙关键词优化服务
  • 做电子政务网站四川seo技术培训
  • 网页设计的步骤有哪些广州seo推广
  • 深圳做网站开发费用seo自动刷外链工具
  • 网站被降权会发生什么推广方案经典范文
  • 建设一个网站是不必须备案搜索引擎优化简历
  • 可以做卷子的网站网络推广有效果吗
  • 做资源下载网站条件新东方教育机构官网
  • 深圳自适应网站建设价格惠州seo排名收费
  • 网页设计图片代码怎么写seo是什么意思为什么要做seo
  • 网站内容标签设计怎么在百度制作自己的网站
  • 网站和微信公众号建设方案chrome浏览器官网入口
  • 网站建设方案视频教程西安seo优化培训
  • sae 企业网站开源crm系统
  • 网站范例2020最成功的网络营销
  • 原神网页设计作业seo外链专员
  • dede网站怎么做单页面seo短视频网页入口营销
  • 网站每日签到怎么做搜索引擎优化的目的是对用户友好
  • 建一个网站多少钱?营销策划公司的经营范围
  • 网站开发软件 论文 摘要精准的搜索引擎优化
  • 什么网站做视频最赚钱网络营销swot分析
  • 做地方网站数据哪里来能让网络非常流畅的软件
  • 哪类公司做网站的最多域名服务器ip查询网站
  • .net做网站开发网站seo排名培训
  • 计算机学院网站建设系统可行性分析seo排名优化收费
  • wordpress主题好看的seo 优化思路
  • asp中用jqure做网站株洲网页设计
  • 郑州上海做网站的公司自己如何做网站
  • php动态网站开发实训目的指数分布
  • 做网站公司需要多少钱网站推广优化教程