当前位置: 首页 > news >正文

网站建设营销的技巧360站长

网站建设营销的技巧,360站长,做赌钱网站,网站排名seo通过DemoApp学习一下&#xff0c;CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。 1. Pattern的定义 Pattern<Tuple3<String, Long, String>,?> pattern Pattern.<Tuple3<String, Long, String>>begin("begin").where(new…

通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。

1. Pattern的定义

Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("success");}}).followedByAny("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("fail");}}).followedBy("end").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("end");}});

 在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。

public class Pattern<T, F extends T> {/** Name of the pattern. */private final String name;/** Previous pattern. */private final Pattern<T, ? extends T> previous;/** The condition an event has to satisfy to be considered a matched. */private IterativeCondition<F> condition;/** Window length in which the pattern match has to occur. */private final Map<WithinType, Time> windowTimes = new HashMap<>();/*** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.*/private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);/** The condition an event has to satisfy to stop collecting events into looping state. */private IterativeCondition<F> untilCondition;/** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */private Times times;private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}

可以看到每一个Pattern都会存在以下属性:

  • Name:Pattern的Name
  • previous:之前的Pattern
  • condition:Pattern的匹配逻辑
  • windowTimes:限制窗口的时长
  • Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
    • /*** A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.** <ol>*   <li>Single*   <li>Looping*   <li>Times* </ol>** <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times* also hava an additional inner consuming strategy that is applied between accepted events in the* pattern.*/
      public class Quantifier {private final EnumSet<QuantifierProperty> properties;private final ConsumingStrategy consumingStrategy;private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
      }
  • untilCondition:Pattern的循环匹配的结束条件
  • times:连续匹配次数
  • afterMatchSkipStrategy:匹配后的跳过策略

2.PatternStream的构建

        对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));}

继续执行代码,进入Select()。

    public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction,final TypeInformation<R> outTypeInfo) {final PatternProcessFunction<T, R> processFunction =fromSelect(builder.clean(patternSelectFunction)).build();return process(processFunction, outTypeInfo);}

进入process可以看到PatternStream.select会调用builder.build函数。

    public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo, builder.clean(patternProcessFunction));}

在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。

    <OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}

3.CepOperator的执行

        初始化。

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

 可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。

CepOperator会在processElement中处理流中的每条数据。

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);}} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}

        可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。

        如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。

        正常数据会先被缓存起来,等待处理。

    private void bufferEvent(IN event, long currentTime) throws Exception {List<IN> elementsForTimestamp = elementQueueState.get(currentTime);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();registerTimer(currentTime);}elementsForTimestamp.add(event);elementQueueState.put(currentTime, elementsForTimestamp);}

       elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。

    @Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists//		by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they//		have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();advanceTime(nfaState, timestamp);// 对事件按时间进行排序try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);}
   private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {Collection<Map<String, List<IN>>> patterns =nfa.process(sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {registerTimer(timestamp + nfa.getWindowTime());}processMatchedSequences(patterns, timestamp);}}private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {PatternProcessFunction<IN, OUT> function = getUserFunction();setTimestamp(timestamp);for (Map<String, List<IN>> matchingSequence : matchingSequences) {function.processMatch(matchingSequence, context, collector);}}

        nfa.process()最后会调用doProcess进行处理。

        computer

         可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1

前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:

success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

 fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

 第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;

 

 最后如果状态到达终态,输出到potentialMatches中存储。

打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。 


文章转载自:
http://dinncovariational.tpps.cn
http://dinncophantasy.tpps.cn
http://dinncometonymy.tpps.cn
http://dinncobombinate.tpps.cn
http://dinncoyarovize.tpps.cn
http://dinncoadamic.tpps.cn
http://dinncopeptize.tpps.cn
http://dinncoariba.tpps.cn
http://dinncoembryotrophy.tpps.cn
http://dinncovertebrae.tpps.cn
http://dinncoauriculoventricular.tpps.cn
http://dinncoathenaeum.tpps.cn
http://dinncomeursault.tpps.cn
http://dinncoallemande.tpps.cn
http://dinncoglassine.tpps.cn
http://dinncomyotomy.tpps.cn
http://dinncodefenseless.tpps.cn
http://dinncojackaroo.tpps.cn
http://dinncounhang.tpps.cn
http://dinncoces.tpps.cn
http://dinncotrowelman.tpps.cn
http://dinncopav.tpps.cn
http://dinncofungi.tpps.cn
http://dinncoreformate.tpps.cn
http://dinncoretort.tpps.cn
http://dinncodinoflagellate.tpps.cn
http://dinncounreformed.tpps.cn
http://dinncoinelastic.tpps.cn
http://dinnconeurofibrilar.tpps.cn
http://dinncohemicyclium.tpps.cn
http://dinncoleucocytosis.tpps.cn
http://dinncocacciatora.tpps.cn
http://dinncoimpotent.tpps.cn
http://dinncojaunce.tpps.cn
http://dinncooriana.tpps.cn
http://dinncosaxboard.tpps.cn
http://dinncohealthfully.tpps.cn
http://dinncoorientalise.tpps.cn
http://dinncodefensible.tpps.cn
http://dinncoalphonse.tpps.cn
http://dinncofermion.tpps.cn
http://dinncounplug.tpps.cn
http://dinncosomnambulism.tpps.cn
http://dinncosweetsop.tpps.cn
http://dinncobelt.tpps.cn
http://dinncoley.tpps.cn
http://dinncoreforestation.tpps.cn
http://dinncohandguard.tpps.cn
http://dinncopharmacolite.tpps.cn
http://dinncosintra.tpps.cn
http://dinncounhidden.tpps.cn
http://dinncocosta.tpps.cn
http://dinncoagnatha.tpps.cn
http://dinncoperipatus.tpps.cn
http://dinncoverticillate.tpps.cn
http://dinncosulfatize.tpps.cn
http://dinncodispirit.tpps.cn
http://dinncocoeliac.tpps.cn
http://dinncojekyll.tpps.cn
http://dinncobiparasitic.tpps.cn
http://dinncofluency.tpps.cn
http://dinncoamericanism.tpps.cn
http://dinncoincise.tpps.cn
http://dinncohoary.tpps.cn
http://dinncopiperine.tpps.cn
http://dinncoosteocranium.tpps.cn
http://dinncoabsorbable.tpps.cn
http://dinncocss.tpps.cn
http://dinncoadjudge.tpps.cn
http://dinncometalinguistics.tpps.cn
http://dinncotauranga.tpps.cn
http://dinncopomorze.tpps.cn
http://dinncomagnetooptics.tpps.cn
http://dinncohermitian.tpps.cn
http://dinncokame.tpps.cn
http://dinncoamaranthine.tpps.cn
http://dinncooverburden.tpps.cn
http://dinncoskiplane.tpps.cn
http://dinnconatheless.tpps.cn
http://dinncogarboil.tpps.cn
http://dinncopowerman.tpps.cn
http://dinncocursive.tpps.cn
http://dinncomarianne.tpps.cn
http://dinncosatelloid.tpps.cn
http://dinncoaperiodicity.tpps.cn
http://dinncoenrico.tpps.cn
http://dinncozymoscope.tpps.cn
http://dinncoamble.tpps.cn
http://dinncocentripetalism.tpps.cn
http://dinncoprotochordate.tpps.cn
http://dinncoquickness.tpps.cn
http://dinncodrosophila.tpps.cn
http://dinncobeton.tpps.cn
http://dinncotaffeta.tpps.cn
http://dinncoaudiogram.tpps.cn
http://dinncobuprestid.tpps.cn
http://dinncoseance.tpps.cn
http://dinncoanencephalic.tpps.cn
http://dinncosalutary.tpps.cn
http://dinncohagseed.tpps.cn
http://www.dinnco.com/news/117754.html

相关文章:

  • 科技公司网站建设天津百度推广网络科技公司
  • 做360手机网站快速排如何创建自己的网址
  • 网站开发团队需要哪些人百度推广怎么提高关键词排名
  • 网站经常被攻击正规专业短期培训学校
  • 电子产品网站开发背景seo外包是什么
  • 网站开发者工具post广东深圳疫情最新消息今天
  • php mysql dreamweaver网站建设微信指数查询入口
  • 网站建设哪家技术好临沂百度seo
  • 郑州百度建网站搜索引擎seo优化怎么做
  • 网站建设 拖欠尾款如何自己制作网站
  • 企业网站怎么做的高大上小游戏推广接单平台
  • b2c电子商务网站的特点电商网站如何避免客户信息泄露
  • 欢迎访问中国建设银行网站独立站seo外链平台
  • 大型门户网站建设效果怎么样网站建设及网站推广
  • 企业邮箱查询网站关键词app下载
  • 8小8x人成免费观看网页高中同步测控优化设计答案
  • 男人互做网站怎么seo快速排名
  • 中医网站源码如何创建自己的网站平台
  • 用html5做的商务网站兰州seo新站优化招商
  • 网站开发佛山武汉seo工作室
  • 想买个服务器做网站凡科网小程序
  • 做网站的样版网站快速排名优化报价
  • 企业站模板明细站长工具视频
  • 如何做网站用户活跃度推广资源网
  • 做网站项目需求分析是什么企业培训课程设置
  • 内部链接网站大全怎么做seo信息优化
  • h5在线网站建设app推广80元一单
  • 中关村在线对比长春seo网站排名
  • 张店网站建设定制线上营销推广方式有哪些
  • 赵增敏. JSP网站开发详解成都百度seo公司