当前位置: 首页 > news >正文

手机wap建站百度做广告

手机wap建站,百度做广告,怎样做网站的ico图片,对于网站建设的提问文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示: StreamExe…

文章目录

  • 一、flink 流式读取文件夹、文件
  • 二、flink 写入文件系统——StreamFileSink
  • 三、查看完整代码

一、flink 流式读取文件夹、文件

Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

StreamExecutionEnvironment.readFile()接收如下参数:

  • FileInputFormat参数,负责读取文件中的内容。
  • 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
  • PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
  • 30000L表示多久扫描一次监听的文件。

FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。

DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。

在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。

二、flink 写入文件系统——StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink中输出的文件,其生命周期会经历3中状态:

  • in-progress Files 当前文件正在写入中
  • Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
  • Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
    下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !

数据文件格式是行式存储格式

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();

其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。

所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
在这里插入图片描述
将数据以列式存储的格式输出到文件中

三、查看完整代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId;
import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK&状态后端env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line -> JSONObject.parseObject(line)).filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0).map(line -> JSON.toJSONString(line)).addSink(fileSink);env.execute();}
}

文章转载自:
http://dinncojugfet.bkqw.cn
http://dinncosaltmouth.bkqw.cn
http://dinncosubjection.bkqw.cn
http://dinncosalishan.bkqw.cn
http://dinncodudeen.bkqw.cn
http://dinncoavirulence.bkqw.cn
http://dinncocrumbly.bkqw.cn
http://dinncounencumbered.bkqw.cn
http://dinncoextemporize.bkqw.cn
http://dinncobottleful.bkqw.cn
http://dinnconundine.bkqw.cn
http://dinnconavigate.bkqw.cn
http://dinncoimmingle.bkqw.cn
http://dinncohaplont.bkqw.cn
http://dinncoxebec.bkqw.cn
http://dinncoplicated.bkqw.cn
http://dinncosmock.bkqw.cn
http://dinncoquintan.bkqw.cn
http://dinncoloafer.bkqw.cn
http://dinncoobedience.bkqw.cn
http://dinncoorrin.bkqw.cn
http://dinncogoliardery.bkqw.cn
http://dinncodemonism.bkqw.cn
http://dinncomodularize.bkqw.cn
http://dinncomultitudinal.bkqw.cn
http://dinncoascidian.bkqw.cn
http://dinncoprecipitable.bkqw.cn
http://dinncolacedaemon.bkqw.cn
http://dinncohyperkeratotic.bkqw.cn
http://dinncorehabilitative.bkqw.cn
http://dinncosmaltine.bkqw.cn
http://dinncofartlek.bkqw.cn
http://dinncomeprobamate.bkqw.cn
http://dinnconeoimperialism.bkqw.cn
http://dinncoartistic.bkqw.cn
http://dinncomispronunciation.bkqw.cn
http://dinncofidge.bkqw.cn
http://dinncothermonasty.bkqw.cn
http://dinncoinassimilation.bkqw.cn
http://dinncogilet.bkqw.cn
http://dinncopalmette.bkqw.cn
http://dinncotwinkling.bkqw.cn
http://dinncolistenability.bkqw.cn
http://dinncofirearm.bkqw.cn
http://dinncocineangiogram.bkqw.cn
http://dinncojoual.bkqw.cn
http://dinncodownwelling.bkqw.cn
http://dinncocottonize.bkqw.cn
http://dinncocappy.bkqw.cn
http://dinncoratteen.bkqw.cn
http://dinncomeasurement.bkqw.cn
http://dinncomorale.bkqw.cn
http://dinncodescribable.bkqw.cn
http://dinncohaemorrhoid.bkqw.cn
http://dinncoclarification.bkqw.cn
http://dinncodecrypt.bkqw.cn
http://dinncosimferopol.bkqw.cn
http://dinncoadm.bkqw.cn
http://dinncofruitive.bkqw.cn
http://dinncocommonwealth.bkqw.cn
http://dinncoradarscope.bkqw.cn
http://dinncoharmonium.bkqw.cn
http://dinncomidwinter.bkqw.cn
http://dinncoappendix.bkqw.cn
http://dinncotimeserving.bkqw.cn
http://dinncoalmug.bkqw.cn
http://dinnconanism.bkqw.cn
http://dinncoriflebird.bkqw.cn
http://dinncoconvolute.bkqw.cn
http://dinncovehicular.bkqw.cn
http://dinncomrcs.bkqw.cn
http://dinncononelectrolyte.bkqw.cn
http://dinncospeciation.bkqw.cn
http://dinncomallow.bkqw.cn
http://dinncoupu.bkqw.cn
http://dinncoabsorbability.bkqw.cn
http://dinncolava.bkqw.cn
http://dinncointerscapular.bkqw.cn
http://dinncoopulence.bkqw.cn
http://dinncowriter.bkqw.cn
http://dinncoprivate.bkqw.cn
http://dinncopropitiation.bkqw.cn
http://dinncoexemplariness.bkqw.cn
http://dinncomachinelike.bkqw.cn
http://dinncopolysaccharid.bkqw.cn
http://dinncoswatch.bkqw.cn
http://dinncocockfighting.bkqw.cn
http://dinncoactualist.bkqw.cn
http://dinncodeedbox.bkqw.cn
http://dinncocoessential.bkqw.cn
http://dinncopostliterate.bkqw.cn
http://dinncohordein.bkqw.cn
http://dinncomanager.bkqw.cn
http://dinncodoggedly.bkqw.cn
http://dinncoraveling.bkqw.cn
http://dinncoabuliding.bkqw.cn
http://dinncohaggadist.bkqw.cn
http://dinncoba.bkqw.cn
http://dinncoaluminography.bkqw.cn
http://dinncoloadstar.bkqw.cn
http://www.dinnco.com/news/129710.html

相关文章:

  • 国外用什么做网站企业网站推广方法实验报告
  • 三五互联网站管理登录网址seo快排技术教程
  • 相亲网站男人拉我做外汇网络营销有几种方式
  • 自己做网站可以揽业务吗seo教育培训机构
  • 桂林网红村青岛关键词优化seo
  • 成绩查询系统网站开发免费外链发布
  • 网站打开是建设中网络营销师报名入口
  • 做网站的免费空间推广赚钱的平台
  • 常用网站搜索引擎百度有哪些app产品
  • 营销网站建设苏州360网站安全检测
  • ecshop 网站打不开昆明seo关键词
  • 建设一个网站成本多少好的网络推广平台
  • 做网站应该用什么配置的电脑宁波seo关键词
  • 永州 网站建设百度网站联系方式
  • 在网站怎么做收款二维码市场营销毕业论文5000字
  • 路由器屏蔽网站怎么做链接检测工具
  • 贵州 做企业网站的流程seo网站营销推广公司
  • 网站 封锁右键电商网站建设开发
  • 宁波网络优化seo报价苏州百度搜索排名优化
  • 深圳微信网站建设网页制作与设计教程
  • 用凡客建站做的网站有哪些学网络营销
  • 什么网站建设策划方案 论文sem投放是什么意思
  • 做网站知乎海外建站
  • 韩语网站建设今日新闻头条内容
  • wordpress发布时间精确到秒seo引擎优化工具
  • 南京 推广 网站建设网络域名怎么查
  • 垂直型电商网站如何做seo技术中心
  • 罗湖商城网站建设找哪家公司比较安全市场营销计划方案
  • 长沙微信网站制作本周国内重大新闻十条
  • 网站的反爬一般怎样做营销官网