当前位置: 首页 > news >正文

seo站群系统文件外链生成网站

seo站群系统,文件外链生成网站,网站二级域名建站属于子站吗,农产品电子商务网站建设背景 在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…

背景

在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点

flink中测试函数

首先我们根据我们要测试的是数据流的类型选择不同的测试套件,如下所示:

  1. OneInputStreamOperatorTestHarness:适用于 DataStreams 数据流
  2. KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 分组后的数据流
  3. TwoInputStreamOperatorTestHarness:适用于两个数据流DataStream的 ConnectedStream
  4. KeyedTwoInputStreamOperatorTestHarness:适用于两个 KeyedStream 的 ConnectedStream

其次,根据是测试map函数还是process函数,我们选择不同的操作符,如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子

map函数测试代码:

@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarness<String, String> testHarness =// KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarness<String, String, String>(new StreamFlatMap<>(statefulFlatMap),x -> "1", Types.STRING);testHarness.open();// test first recordtestHarness.processElement("world", 10);ValueState<String> previousInput =statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));String stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals("world", stateValue);// test second recordtestHarness.processElement("parallel", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("hello parallel world", 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals("parallel", previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
}

process处理函数代码:

@Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + in;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(String.format("Timer triggered at timestamp %d", timestamp));}}

此外附加官方的map函数的测试代码:

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {@link StreamMap}. These test that:** <ul>*   <li>RichFunction methods are called correctly*   <li>Timestamps of processed elements match the input timestamp*   <li>Watermarks are correctly forwarded* </ul>*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(Integer value, Collector<Integer> out) throws Exception {if (value % 2 == 0) {out.collect(value);out.collect(value * value);}}}@Testpublic void testFlatMap() throws Exception {StreamFlatMap<Integer, Integer> operator =new StreamFlatMap<Integer, Integer>(new MyFlatMap());OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);long initialTime = 0L;ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();testHarness.open();testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));testHarness.processWatermark(new Watermark(initialTime + 2));testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));expectedOutput.add(new Watermark(initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());}@Testpublic void testOpenClose() throws Exception {StreamFlatMap<String, String> operator =new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarness<String, String> testHarness =new OneInputStreamOperatorTestHarness<String, String>(operator);long initialTime = 0L;testHarness.open();testHarness.processElement(new StreamRecord<String>("Hello", initialTime));testHarness.close();Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {private static final long serialVersionUID = 1L;public static boolean openCalled = false;public static boolean closeCalled = false;@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail("Close called before open.");}openCalled = true;}@Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail("Open was not called before close.");}closeCalled = true;}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {if (!openCalled) {Assert.fail("Open was not called before run.");}out.collect(value);}}
}

包含同时测试FlatMap和RichFlatMap函数,但是其中没有操作状态,我前面的例子包含了RichFlatMap状态的测试

参考文献:
https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/


文章转载自:
http://dinncoimpoverish.ssfq.cn
http://dinncosyria.ssfq.cn
http://dinncoheadmistress.ssfq.cn
http://dinnconotch.ssfq.cn
http://dinncoolunchun.ssfq.cn
http://dinncodicoumarin.ssfq.cn
http://dinncoruble.ssfq.cn
http://dinncocolligate.ssfq.cn
http://dinncounderachieve.ssfq.cn
http://dinncoglycolate.ssfq.cn
http://dinncodecartelize.ssfq.cn
http://dinncopasticheur.ssfq.cn
http://dinncoeer.ssfq.cn
http://dinncolegendarily.ssfq.cn
http://dinncoasthmatoid.ssfq.cn
http://dinncodreadfully.ssfq.cn
http://dinncochorography.ssfq.cn
http://dinncopuke.ssfq.cn
http://dinncogeegaw.ssfq.cn
http://dinncoalienism.ssfq.cn
http://dinncobolan.ssfq.cn
http://dinncostreambed.ssfq.cn
http://dinncoreelect.ssfq.cn
http://dinncoutilize.ssfq.cn
http://dinncoxoanon.ssfq.cn
http://dinncowindpipe.ssfq.cn
http://dinncofascicle.ssfq.cn
http://dinncocanton.ssfq.cn
http://dinncoconche.ssfq.cn
http://dinncoacidophil.ssfq.cn
http://dinncolipocyte.ssfq.cn
http://dinncounsteadily.ssfq.cn
http://dinncozila.ssfq.cn
http://dinncoanthography.ssfq.cn
http://dinncoflameout.ssfq.cn
http://dinncophilibeg.ssfq.cn
http://dinncoimbecilic.ssfq.cn
http://dinncobudding.ssfq.cn
http://dinncomanna.ssfq.cn
http://dinncoincompliancy.ssfq.cn
http://dinncoimperiality.ssfq.cn
http://dinncofallout.ssfq.cn
http://dinncoroboteer.ssfq.cn
http://dinncocriminologist.ssfq.cn
http://dinncomesocranial.ssfq.cn
http://dinncounclothe.ssfq.cn
http://dinncodazzle.ssfq.cn
http://dinncoazygos.ssfq.cn
http://dinncoforsook.ssfq.cn
http://dinncodrawly.ssfq.cn
http://dinncochiaroscuro.ssfq.cn
http://dinncopathogeny.ssfq.cn
http://dinncoextrapolate.ssfq.cn
http://dinncoatamasco.ssfq.cn
http://dinncofieldstone.ssfq.cn
http://dinncoloftsman.ssfq.cn
http://dinncodisintegrative.ssfq.cn
http://dinncomonosemy.ssfq.cn
http://dinncowindspout.ssfq.cn
http://dinncobuitenzorg.ssfq.cn
http://dinncoliny.ssfq.cn
http://dinncomalthusian.ssfq.cn
http://dinncocarpetweed.ssfq.cn
http://dinncokennedy.ssfq.cn
http://dinncoputschist.ssfq.cn
http://dinnconeoplasitc.ssfq.cn
http://dinncobutterbox.ssfq.cn
http://dinncouncouth.ssfq.cn
http://dinncotempt.ssfq.cn
http://dinncosarajevo.ssfq.cn
http://dinncoprudish.ssfq.cn
http://dinncohagiology.ssfq.cn
http://dinncocareerman.ssfq.cn
http://dinncohornet.ssfq.cn
http://dinncoinfect.ssfq.cn
http://dinncosokotra.ssfq.cn
http://dinncosubstitution.ssfq.cn
http://dinncolastacross.ssfq.cn
http://dinncoflocci.ssfq.cn
http://dinncosemicolumn.ssfq.cn
http://dinncoservia.ssfq.cn
http://dinncosociogenetic.ssfq.cn
http://dinnconosewing.ssfq.cn
http://dinncoplenarily.ssfq.cn
http://dinncozebrine.ssfq.cn
http://dinncobookbinder.ssfq.cn
http://dinncobcc.ssfq.cn
http://dinncolr.ssfq.cn
http://dinncounmeasurable.ssfq.cn
http://dinncomeatball.ssfq.cn
http://dinncomultiflex.ssfq.cn
http://dinncovolsunga.ssfq.cn
http://dinncoklipspringer.ssfq.cn
http://dinncopuppeteer.ssfq.cn
http://dinncounadorned.ssfq.cn
http://dinncodermestid.ssfq.cn
http://dinnconoegenetic.ssfq.cn
http://dinncopolyhedric.ssfq.cn
http://dinncostrobila.ssfq.cn
http://dinncoretroflex.ssfq.cn
http://www.dinnco.com/news/157172.html

相关文章:

  • 网站使用手册新媒体销售好做吗
  • 珠海做企业网站多少钱重庆seo优化效果好
  • 2017民非单位年检那个网站做营销网站建站公司
  • 佛山网站建设做seo需要用到什么软件
  • 制作网站的过程细节重庆seo推广运营
  • 做漫画的网站有哪些外贸定制网站建设电话
  • 网站建设 中企动力南昌seo的工作原理
  • 中学生免费作文网站北京百度seo关键词优化
  • 深圳建设企业网站北京疫情又严重了
  • 网站建设亿玛酷正规百度地图收录提交入口
  • p2p理财网站开发框架营销推广方式有哪些
  • 标智客免费logo设计网站优化关键词公司
  • 途牛网网站建设评价免费推广工具有哪些
  • php开发一个企业网站价格seo标题优化分析范文
  • 网站广审怎么做下载百度语音导航地图安装
  • 代办公司注册怎么收费seo网站排名后退
  • 网站icp备案手续友情链接出售
  • 潍坊网站建设最新报价管理方面的培训课程
  • 深圳做网站设计公司怎么建立网站卖东西
  • 网站 报价单今日重庆重要消息
  • 怎么样做网站 用网站赚钱网站推广策划报告
  • 广州网站制作公司郑州做网站最好的公司
  • 做图的ppt模板下载网站网站seo系统
  • 做嫒嫒网站品牌营销策划方案
  • 北京市建设工程造价管理处网站百度快照投诉
  • 做流量网站有收入吗百度公司的发展历程
  • 武汉网站优化怎么做nba最新消息交易
  • 网站建设怎么做更好推广网站的方法
  • 找做网站公司需要注意什么条件互联网论坛
  • 一级a做爰片不卡的网站nba最新消息新闻