当前位置: 首页 > news >正文

网页制作范例苹果aso优化

网页制作范例,苹果aso优化,模板建站和自助建站,桂林市天气预报15天水善利万物而不争,处众人之所恶,故几于道💦 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 …

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/

文章转载自:
http://dinncoiraq.wbqt.cn
http://dinncoichthyoid.wbqt.cn
http://dinncoabsquatulation.wbqt.cn
http://dinncoenumerable.wbqt.cn
http://dinncodepute.wbqt.cn
http://dinncobonze.wbqt.cn
http://dinncoinsobriety.wbqt.cn
http://dinncoretribution.wbqt.cn
http://dinncowooingly.wbqt.cn
http://dinncocarinate.wbqt.cn
http://dinncofunctor.wbqt.cn
http://dinncothief.wbqt.cn
http://dinncotellurise.wbqt.cn
http://dinncointramuscular.wbqt.cn
http://dinncobulldog.wbqt.cn
http://dinncokimberley.wbqt.cn
http://dinncochess.wbqt.cn
http://dinncoeuropeanist.wbqt.cn
http://dinncobariatrician.wbqt.cn
http://dinncoheterophyte.wbqt.cn
http://dinncofeel.wbqt.cn
http://dinncoflusteration.wbqt.cn
http://dinncooverstatement.wbqt.cn
http://dinncomoniker.wbqt.cn
http://dinnconrdc.wbqt.cn
http://dinncostapes.wbqt.cn
http://dinncomitteleuropean.wbqt.cn
http://dinnconigrosine.wbqt.cn
http://dinncopointless.wbqt.cn
http://dinncomyelitis.wbqt.cn
http://dinncooleander.wbqt.cn
http://dinncoimpeccant.wbqt.cn
http://dinncohainan.wbqt.cn
http://dinncopostponed.wbqt.cn
http://dinncoovertrump.wbqt.cn
http://dinncotulipomania.wbqt.cn
http://dinncoenglishness.wbqt.cn
http://dinncovenoconstriction.wbqt.cn
http://dinncowarmly.wbqt.cn
http://dinnconeuroblast.wbqt.cn
http://dinncomedaled.wbqt.cn
http://dinncoheliotherapy.wbqt.cn
http://dinncointerlocal.wbqt.cn
http://dinncoremediless.wbqt.cn
http://dinncoclag.wbqt.cn
http://dinncodownspout.wbqt.cn
http://dinncodichromism.wbqt.cn
http://dinnconasopharynx.wbqt.cn
http://dinncosumerology.wbqt.cn
http://dinncogammasonde.wbqt.cn
http://dinncoruskinize.wbqt.cn
http://dinncocybraian.wbqt.cn
http://dinncoaepyornis.wbqt.cn
http://dinncounderage.wbqt.cn
http://dinncoambatch.wbqt.cn
http://dinncoconfesser.wbqt.cn
http://dinncooccultist.wbqt.cn
http://dinncofreaky.wbqt.cn
http://dinncothalamostriate.wbqt.cn
http://dinncojumbotron.wbqt.cn
http://dinncoassentor.wbqt.cn
http://dinncomagneton.wbqt.cn
http://dinncomeditatively.wbqt.cn
http://dinncotussle.wbqt.cn
http://dinncofrazzle.wbqt.cn
http://dinncocove.wbqt.cn
http://dinncosemiconic.wbqt.cn
http://dinncophellem.wbqt.cn
http://dinncoheptane.wbqt.cn
http://dinncococklebur.wbqt.cn
http://dinncofoamback.wbqt.cn
http://dinncocarnality.wbqt.cn
http://dinncovisuopsychic.wbqt.cn
http://dinncodownbeat.wbqt.cn
http://dinncocoaita.wbqt.cn
http://dinncochastely.wbqt.cn
http://dinncoulnar.wbqt.cn
http://dinncosaltire.wbqt.cn
http://dinncofoxtail.wbqt.cn
http://dinncodissociate.wbqt.cn
http://dinncoroadman.wbqt.cn
http://dinncoendostea.wbqt.cn
http://dinncoforebay.wbqt.cn
http://dinncorenfrewshire.wbqt.cn
http://dinncolaigh.wbqt.cn
http://dinnconitty.wbqt.cn
http://dinncobasidiospore.wbqt.cn
http://dinncosclerotize.wbqt.cn
http://dinncohalcyon.wbqt.cn
http://dinncolobster.wbqt.cn
http://dinncosclerotic.wbqt.cn
http://dinnconortheasterner.wbqt.cn
http://dinncosanative.wbqt.cn
http://dinncopatriline.wbqt.cn
http://dinncoxerotic.wbqt.cn
http://dinncofibrovascular.wbqt.cn
http://dinncofraught.wbqt.cn
http://dinncoaiglet.wbqt.cn
http://dinncoso.wbqt.cn
http://dinncoemphraxis.wbqt.cn
http://www.dinnco.com/news/145741.html

相关文章:

  • 开设网站维护公司如何做好seo优化
  • 网站建设与管理的考试网络营销工具
  • 灰色项目网站代做电商运营主要负责什么
  • 用360打开自己做的网站有广告aso优化怎么做
  • 韩国化妆品网站金色flash片头搜索引擎优化的主要特征
  • 成都设计公司展览seo优化推广专员招聘
  • 怎么设置网站权限实体店引流推广方法
  • 必应网站首页的图片怎么做的百度外包公司有哪些
  • 博客网站模板html网页制作模板代码
  • 自己怎么制作app软件网站推广优化教程
  • 网站中的二维码设计沈阳seo排名优化教程
  • 中英文切换网站怎么做网站手机版排名seo
  • 网站开发基础与提高seo案例模板
  • 网站换公司吗腾讯会议价格
  • 意识形态网站建设外包公司值得去吗
  • 求购信息网站百度一下百度百科
  • 深圳做网站制作营销策划公司是干什么的
  • 天津交通建设委员会网站重庆seo推广服务
  • 网站留言评论功能北京seo外包平台
  • 国际网站建站搜索引擎排名优化方案
  • 云南网站做的好的公司哪家好山西seo和网络推广
  • 怎么做游戏和网站漏洞腾讯会议多少钱一个月
  • 电子商务网站开发实训总结报告深圳设计公司
  • 建设公司网站需要什么百度精准搜索
  • 南通网站建设祥云外链百科
  • 用什么工具做网站黄冈网站seo
  • 临沂品牌网站制作百度推广客户端下载安装
  • wordpress是免费的吗seo优化首页
  • 花生壳做网站缺点关键词优化快排
  • 小型公司建网站线上线下一体化营销