当前位置: 首页 > news >正文

阿里云官方网站 icp代备案管理系统镇江网站建站

阿里云官方网站 icp代备案管理系统,镇江网站建站,音乐主题资源网站建设,摄影网站介绍目录 一、Flink Data Sources 分类概览 (一)预定义 Source (二)自定义 Source 二、代码实战演示 (一)预定义 Source 示例 基于本地集合 基于本地文件 基于网络套接字(socketTextStream&…

目录

一、Flink Data Sources 分类概览

(一)预定义 Source

(二)自定义 Source

二、代码实战演示

(一)预定义 Source 示例

基于本地集合

基于本地文件

基于网络套接字(socketTextStream)

(二)自定义 Source 示例

三、Kafka Source 应用

四、总结


        在大数据处理领域,Apache Flink 作为一款强大的流式计算框架,既能应对流处理场景,也可处理批处理任务。而数据来源(Data Sources)作为整个计算流程的 “源头活水”,其多样性与合理运用至关重要。本文将深入剖析 Flink 中 Data Sources 的相关知识,并结合丰富代码示例,助力大家透彻理解与灵活运用。

一、Flink Data Sources 分类概览

Flink 在批 / 流处理中常见的 source 主要分为两大类:预定义 Source 和自定义 Source。

(一)预定义 Source

基于本地集合的 source(Collection-based-source)

        通过env.fromElements()可传入可变参数创建 DataStream,支持如 Tuple、自定义对象等复合形式,但要注意类型需一致,不一致时虽可用Object接收但使用易报错,像env.fromElements("haha", 1)这种就会有问题;env.fromCollection()支持多种Collection具体类型(如ListSetQueue)来构建 DataStream;env.fromSequence()可基于开始和结束值创建 DataStream(曾有env.generateSequence()方法创建基于 Sequence 的 DataStream,不过现已废弃),此类方式常应用于学习测试编造数据场景。

基于文件的 source(File-based-source)

        能读取本地文件与 HDFS 路径文件,如env.readTextFile("datas/wc.txt")可读取本地datas目录下wc.txt文件,env.readTextFile("hdfs://bigdata01:9820/home/a.txt")能获取 HDFS 特定路径文件数据。操作时要留意相对路径转绝对路径问题,避免因路径差错引发异常。

基于网络套接字(socketTextStream)

        socketTextStream(String hostname, int port)方法从指定 Socket 读取数据创建 DataStream,其为非并行 Source,有重载方法可指定行分隔符和最大重新连接次数,默认行分隔符是\n,最大重新连接次数为 0。使用前需先启动 Socket 服务(Mac 或 Linux 可在命令行终端输入nc -lk 8888,Windows 需安装netcat命令后操作),且该方式获取的 DataStream 并行度固定为 1。

(二)自定义 Source

SourceFunction

        非并行数据源(并行度只能 = 1),作为接口定义基础数据源规范,实现run方法持续产生数据,cancel方法用于停止数据源。

RichSourceFunction

        多功能非并行数据源(并行度只能 = 1),是类形式,相比SourceFunction,额外功能体现在实例化时有open方法执行一次(多并行度会多次执行,因多实例)、销毁实例时close方法执行一次,且能通过getRuntimeContext获取当前Runtime对象(底层 API)。

ParallelSourceFunction

        并行数据源(并行度能够 >= 1),接口形式,允许创建并行处理的数据源,例如自定义类实现此接口,按设定并行度生成数据。

RichParallelSourceFunction

        多功能并行数据源(并行度能够 >= 1),类形式且功能齐全,建议使用。继承它并重写相关方法,能充分利用并行特性高效产生数据,同时享有Rich类的openclose等方法优势。

二、代码实战演示

(一)预定义 Source 示例

在flink最常见的创建DataStream方式有四种: 

l 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements("haha", 1);

源码注释中有写:


l 使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue
l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了
l 使用env.fromSequence()方法创建基于开始和结束的DataStream

一般用于学习测试时编造数据时使用
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
3.env.fromSequence(开始,结束);

基于本地集合

package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class _01YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 各种获取数据的SourceDataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");dataStreamSource.print();// 演示一个错误的//DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);//dataStreamSource2.print();DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(Tuple2.of("张三", 18),Tuple2.of("lisi", 18),Tuple2.of("wangwu", 18));elements.print();// 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的APIString[] arr = {"hello","world"};System.out.println(arr.length);System.out.println(Arrays.toString(arr));List<String> list = Arrays.asList(arr);System.out.println(list);env.fromElements(Arrays.asList(arr),Arrays.asList(arr),Arrays.asList(arr)).print();// 第二种加载数据的方式// Collection 的子接口只有 Set 和 ListArrayList<String> list1 = new ArrayList<>();list1.add("python");list1.add("scala");list1.add("java");DataStreamSource<String> ds1 = env.fromCollection(list1);DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));// 第三种DataStreamSource<Long> ds3 = env.fromSequence(1, 100);ds3.print();// execute 下面的代码不运行,所以,这句话要放在最后。env.execute("获取预定义的Source");}
}

可以在代码中指定并行度

l 指定全局并行度:

env.setParallelism(12);

l 获得全局并行度:

env.getParallelism();

指定算子设置并行度:

获取指定算子并行度:

eventSource.getParallelism();

基于本地文件

package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class _02YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取并行度System.out.println(env.getParallelism());// 讲第二种Source File类型的// 给了一个相对路径,说路径不对,老闫非要写,我咋办?// 相对路径,转绝对路径File file = new File("datas/wc.txt");File file2 = new File("./");System.out.println(file.getAbsoluteFile());System.out.println(file2.getAbsoluteFile());DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");ds1.print();// 还可以获取hdfs路径上的数据DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");ds2.print();// execute 下面的代码不运行,所以,这句话要放在最后。env.execute("获取预定义的Source");}
}

基于网络套接字(socketTextStream)

socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。

提示:

如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。

通过网盘分享的文件:netcat-win32-1.11.zip

如果是windows平台:nc -lp 8888

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SourceDemo02_Socket {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.source-加载数据DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}});//3.2每个单词记为<单词,1>DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});//3.3分组KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//3.4聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();}
}

(二)自定义 Source 示例

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

简单自定义非并行 Source(实现 SourceFunction)

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.UUID;/*** 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)* 要求:* - 随机生成订单ID(UUID)* - 随机生成用户ID(0-2)* - 随机生成订单金额(0-100)* - 时间戳为当前系统时间*/@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random = new Random();while(flag){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublic void cancel() {flag = false;}
}
public class CustomSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 将自定义的数据源放入到env中DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;System.out.println(dataStreamSource.getParallelism());dataStreamSource.print();env.execute();}}


 自定义并行 Source(实现 ParallelSourceFunction)

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;import java.util.UUID;/*** 自定义多并行度Source*/
public class CustomerSourceWithParallelDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);mySource.print();env.execute();}public static class MySource implements ParallelSourceFunction<String> {@Overridepublic void run(SourceContext<String> ctx) throws Exception {ctx.collect(UUID.randomUUID().toString());/*如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据*/}@Overridepublic void cancel() {}}
}


自定义多功能并行 Source(实现 RichParallelSourceFunction)

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.UUID;/*** 自定义一个RichParallelSourceFunction的实现*/
public class CustomerRichSourceWithParallelDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);mySource.print();env.execute();}/*Rich 类型的Source可以比非Rich的多出有:- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦- getRuntime方法可以获得当前的Runtime对象(底层API)*/public static class MySource extends RichParallelSourceFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("open......");}@Overridepublic void close() throws Exception {super.close();System.out.println("close......");}@Overridepublic void run(SourceContext<String> ctx) throws Exception {ctx.collect(UUID.randomUUID().toString());}@Overridepublic void cancel() {}}
}

三、Kafka Source 应用

Kafka 作为常用消息队列,与 Flink 集成紧密。使用时需添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version>
</dependency>

并配置相关属性,如下示例展示从 Kafka 主题读取数据并筛选含特定字样消息后打印。

创建一个topic1 这个主题:
 

cd /opt/installs/kafka3/bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1通过控制台向topic1发送消息:
bin/kafka-console-producer.sh  --bootstrap-server bigdata01:9092 --topic topic1

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String word) throws Exception {return word.contains("success");}}).print();env.execute();}
}

四、总结

        掌握 Flink 普通 API 里 Source 的各类使用方式,无论是预定义 Source 快速搭建测试数据场景、灵活运用并行度设置优化资源,还是对接 Kafka 这类外部数据源,都是构建高效、稳定大数据处理管道的关键基石。后续可深入各部分细节实践,深挖性能调优等进阶玩法,让 Flink 在数据处理之旅中大放异彩。希望这篇文章能助大家在 Flink Source 使用上理清思路、顺利上手,开启大数据流式计算的精彩探索!


文章转载自:
http://dinncoaneurin.ssfq.cn
http://dinncoromans.ssfq.cn
http://dinncobersagliere.ssfq.cn
http://dinncomunt.ssfq.cn
http://dinncodrollery.ssfq.cn
http://dinncoquizzicality.ssfq.cn
http://dinncounequalable.ssfq.cn
http://dinncoforge.ssfq.cn
http://dinncorecusation.ssfq.cn
http://dinnconaseberry.ssfq.cn
http://dinncosabulite.ssfq.cn
http://dinncoamphiphyte.ssfq.cn
http://dinncogeophysics.ssfq.cn
http://dinncoopaline.ssfq.cn
http://dinncolaevulin.ssfq.cn
http://dinncorappahannock.ssfq.cn
http://dinncohomebrewed.ssfq.cn
http://dinncostubbed.ssfq.cn
http://dinncomicroseismology.ssfq.cn
http://dinncokneebend.ssfq.cn
http://dinncoovercame.ssfq.cn
http://dinncovinegarroon.ssfq.cn
http://dinncorile.ssfq.cn
http://dinncoeuphorbia.ssfq.cn
http://dinncominnesotan.ssfq.cn
http://dinncohorsehide.ssfq.cn
http://dinncocleanse.ssfq.cn
http://dinncodizygotic.ssfq.cn
http://dinncovibrational.ssfq.cn
http://dinncomonotonize.ssfq.cn
http://dinncounspilt.ssfq.cn
http://dinncohorseradish.ssfq.cn
http://dinncounseparated.ssfq.cn
http://dinncovariola.ssfq.cn
http://dinncotellurian.ssfq.cn
http://dinncosuperexcellent.ssfq.cn
http://dinncounbridgeable.ssfq.cn
http://dinncolovely.ssfq.cn
http://dinncocostarican.ssfq.cn
http://dinncobrowser.ssfq.cn
http://dinncoakita.ssfq.cn
http://dinncopitpan.ssfq.cn
http://dinncoineluctable.ssfq.cn
http://dinncoprosthesis.ssfq.cn
http://dinncodatable.ssfq.cn
http://dinncorename.ssfq.cn
http://dinncowomanliness.ssfq.cn
http://dinncohydrotropism.ssfq.cn
http://dinncowankel.ssfq.cn
http://dinncoallotropy.ssfq.cn
http://dinncotimeball.ssfq.cn
http://dinncolayfolk.ssfq.cn
http://dinncotwelfthly.ssfq.cn
http://dinncobocce.ssfq.cn
http://dinncovegas.ssfq.cn
http://dinncopoloist.ssfq.cn
http://dinncosangreal.ssfq.cn
http://dinncorepentant.ssfq.cn
http://dinncopleasurable.ssfq.cn
http://dinncovaudevillian.ssfq.cn
http://dinncocharactron.ssfq.cn
http://dinncodiacidic.ssfq.cn
http://dinncotnb.ssfq.cn
http://dinncocor.ssfq.cn
http://dinncosepaloid.ssfq.cn
http://dinncohiron.ssfq.cn
http://dinncoanaesthetize.ssfq.cn
http://dinncoomega.ssfq.cn
http://dinncorecommit.ssfq.cn
http://dinncosensa.ssfq.cn
http://dinnconeostyle.ssfq.cn
http://dinncowelland.ssfq.cn
http://dinncoentophytic.ssfq.cn
http://dinncothunk.ssfq.cn
http://dinncowiddle.ssfq.cn
http://dinnconov.ssfq.cn
http://dinnconorward.ssfq.cn
http://dinncoringdove.ssfq.cn
http://dinnconorseland.ssfq.cn
http://dinncoalcoran.ssfq.cn
http://dinncoaphrodisia.ssfq.cn
http://dinncochilopod.ssfq.cn
http://dinncoisoplastic.ssfq.cn
http://dinncosweeping.ssfq.cn
http://dinncoincommunicability.ssfq.cn
http://dinncowhiskified.ssfq.cn
http://dinncotrawl.ssfq.cn
http://dinncopettifog.ssfq.cn
http://dinncoaomori.ssfq.cn
http://dinncoopera.ssfq.cn
http://dinncoescutcheon.ssfq.cn
http://dinncodeclamatory.ssfq.cn
http://dinncopakchoi.ssfq.cn
http://dinncoamianthus.ssfq.cn
http://dinncoinstinctive.ssfq.cn
http://dinncomotion.ssfq.cn
http://dinncorats.ssfq.cn
http://dinncocircean.ssfq.cn
http://dinncononresistance.ssfq.cn
http://dinncotarantella.ssfq.cn
http://www.dinnco.com/news/154254.html

相关文章:

  • 浦东新区苏州网站建设设计网站排名
  • 专门做网站的软件温州seo团队
  • 科技公司手机网站公众号怎么推广
  • wordpress行业模板seo运营做什么
  • 给教育类做网站广西seo关键词怎么优化
  • app网站区别厦门seo排名优化
  • 内江网站建设新闻郑州搜索引擎优化公司
  • 温岭自适应网站建设百度的网站
  • 微微网站建设网站优化靠谱seo
  • 中国电力建设集团网站群seo包年优化
  • python 做网站 用哪个框架好龙华百度快速排名
  • 西安网站开发有哪些公司b2b平台都有哪些网站
  • 网站建设制作优化免费学生网页制作成品代码
  • 池州哪里有做网站整合营销
  • 专门做汽车配件保养的网站百度指数人群画像
  • 国外 创意 网站合川网站建设
  • 装修设计软件哪个好用福州seo推广外包
  • 阿里云可以做哪些网站吗友情链接网站免费
  • 做商业网站seo优化软件大全
  • 网站设计外包百度营销推广登录
  • 大型网页设计公司关键词长尾词优化
  • 上网建站长春百度快速优化
  • 做网站的叫什么思耐免费seo技术教程
  • 个体户备案网站可以做企业站吗专业营销团队外包公司
  • 郑州网站建站网站app拉新任务平台
  • 网站备案流程公安手机优化大师哪个好
  • 维护网站成本源码网
  • idea怎么做网页seo课程总结怎么写
  • 使用oss做静态网站怎么在平台上做推广
  • 成都彩蝶花卉网站建设案例站长推广工具