当前位置: 首页 > news >正文

门户网站ui设计西安百度推广网站建设

门户网站ui设计,西安百度推广网站建设,千图网在线设计,河北城乡建设部网站首页前言 flink是实时计算的重要集成组件,这里演示如何集成,并且使用一个小例子。例子是kafka输入消息,用逗号隔开,统计每个相同单词出现的次数,这么一个功能。 一、kafka环境准备 1.1 启动kafka 这里我使用的kafka版本…

前言

flink是实时计算的重要集成组件,这里演示如何集成,并且使用一个小例子。例子是kafka输入消息,用逗号隔开,统计每个相同单词出现的次数,这么一个功能。


一、kafka环境准备

1.1 启动kafka

这里我使用的kafka版本是3.2.0,部署的方法可以参考,
kafka部署

cd kafka_2.13-3.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

启动后查看java进程是否存在,存在后执行下一步。

1.2 新建topic

新建一个专门用于flink消费topic

bin/kafka-topics.sh --create --topic flinkTest --bootstrap-server 192.168.184.129:9092

1.3 测试生产消费是否正常

生产端:

bin/kafka-console-producer.sh --topic flinkTest --bootstrap-server 192.168.184.129:9092

客户端:

bin/kafka-console-consumer.sh --topic flinkTest --from-beginning --bootstrap-server 192.168.184.129:9092

1.4 测试生产消费

在生产端输入aaa
在这里插入图片描述
查看客户端是否能消费到
在这里插入图片描述
可以看到客户端已经消费成功了,kafka环境准备好了。

二、flink集成kafka

2.1 pom文件修改

pom文件修改之前,先看看官网的指导依赖是什么样的,
这里我们使用的是datastream api去做,
flink1.17.0官方文档

在这里插入图片描述
这里说明了相关的依赖需要引入的依赖包的版本,还有使用kafka消费的时候需要引入的连接包版本
在这里插入图片描述
完整的pom引入依赖如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wh.flink</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><name>flink</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!-- Flink Kafka连接器的依赖 -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><!-- Flink 开发Scala需要导入以下依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!--<dependency>--><!--<groupId>org.scala-lang</groupId>--><!--<artifactId>scala-library</artifactId>--><!--<version>2.11.12</version>--><!--</dependency>--><!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-log4j12</artifactId>--><!--<version>1.7.25</version>--><!--<scope>test</scope>--><!--</dependency>--><!--<dependency>--><!--<groupId>log4j</groupId>--><!--<artifactId>log4j</artifactId>--><!--<version>1.2.17</version>--><!--</dependency>--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-api</artifactId>--><!--<version>1.7.25</version>--><!--</dependency>--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-nop</artifactId>--><!--<version>1.7.25</version>--><!--<scope>test</scope>--><!--</dependency>--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-simple</artifactId>--><!--<version>1.7.5</version>--><!--</dependency>--></dependencies><build><plugins><!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<!--            <plugin>-->
<!--                <groupId>org.scala-tools</groupId>-->
<!--                <artifactId>maven-scala-plugin</artifactId>-->
<!--                <version>2.15.2</version>-->
<!--                <executions>-->
<!--                    <execution>-->
<!--                        <goals>-->
<!--                            <goal>compile</goal>-->
<!--                            <goal>testCompile</goal>-->
<!--                        </goals>-->
<!--                    </execution>-->
<!--                </executions>-->
<!--            </plugin>--><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.4</version><configuration><!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --><!--<appendAssemblyId>false</appendAssemblyId>--><archive><manifest><mainClass>com.hadoop.demo.service.flinkDemo.FlinkDemo</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>assembly</goal></goals></execution></executions></plugin></plugins></build>
</project>

项目结构如图
在这里插入图片描述

2.2 代码编写

package com.hadoop.demo.service.flinkDemo;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.Iterator;public class FlinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//新建kafka连接KafkaSource<String> kfkSource = KafkaSource.<String>builder().setBootstrapServers("192.168.184.129:9092").setGroupId("flink").setTopics("flinkTest").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();//添加到flink环境DataStreamSource<String> lines = env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), "kafka source");//根据逗号分组SingleOutputStreamOperator<Tuple2<String, Integer>> map = lines.flatMap(new FlatMapIterator<String, String>() {@Overridepublic Iterator<String> flatMap(String s) throws Exception {return Arrays.asList(s.split(",")).iterator();}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {return new Tuple2<>(s, 1);}});//统计每个单词的数量SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy(0).sum(1);sum.print();//System.out.println(sum.get);env.execute();}}

2.3 maven打包在这里插入图片描述

点击打包按钮,这里注意要选择带依赖的jar包,否则会出现以下错误。

NoClassDefFoundError: org/apache/flink/connector/kafka/source/KafkaSource

三、测试

3.1启动 hadoop集群,启动flink集群

这里如果不知道怎么搭建这两个集群可以看我其他文章
hadoop集成flink

./hadoop.sh start
./bin/yarn-session.sh --detached

3.2 上传jar包到flink集群

在这里插入图片描述
上传后填写主类类名,点击提交
在这里插入图片描述

3.3 测试

点击后,可以看到执行job这里能看到在运行的job
在这里插入图片描述
点击运行的task
在这里插入图片描述
点击输出
在这里插入图片描述
这里可以看到输出内容,
在kafka消费端输入内容,
在这里插入图片描述
这里的jbs出现了4次,看下输出控制台,
在这里插入图片描述
可以看到这里依次累加了四次,说明统计生效了。


总结

这里只是做了一个简单的消费kafka的flink例子,消费成功后还可以通过sink发送出去,还可以用transform进行转换,这里后面再演示,如果不对的可以指出。


文章转载自:
http://dinncofortuneteller.bkqw.cn
http://dinncomysticlsm.bkqw.cn
http://dinncogutter.bkqw.cn
http://dinncogulfweed.bkqw.cn
http://dinncoblatter.bkqw.cn
http://dinncolaniary.bkqw.cn
http://dinncobacteremically.bkqw.cn
http://dinncocomprehensible.bkqw.cn
http://dinncoexactly.bkqw.cn
http://dinncounspoke.bkqw.cn
http://dinncocragged.bkqw.cn
http://dinncoranchi.bkqw.cn
http://dinncopulk.bkqw.cn
http://dinncoprebendal.bkqw.cn
http://dinncoassuagement.bkqw.cn
http://dinncogeoisotherm.bkqw.cn
http://dinncocaul.bkqw.cn
http://dinncofundus.bkqw.cn
http://dinncosonobuoy.bkqw.cn
http://dinncograinsick.bkqw.cn
http://dinncoundermine.bkqw.cn
http://dinncorecomfort.bkqw.cn
http://dinncotelephonist.bkqw.cn
http://dinncotolane.bkqw.cn
http://dinncoamorism.bkqw.cn
http://dinncodecalcification.bkqw.cn
http://dinncowestwall.bkqw.cn
http://dinncofetterlock.bkqw.cn
http://dinncototteringly.bkqw.cn
http://dinncowinefat.bkqw.cn
http://dinncocichlid.bkqw.cn
http://dinncoosteria.bkqw.cn
http://dinncomaymyo.bkqw.cn
http://dinncooverissue.bkqw.cn
http://dinncotelenet.bkqw.cn
http://dinncohypogeous.bkqw.cn
http://dinncomosque.bkqw.cn
http://dinncoskinch.bkqw.cn
http://dinncogumption.bkqw.cn
http://dinncoswank.bkqw.cn
http://dinncoforemother.bkqw.cn
http://dinncosouthmost.bkqw.cn
http://dinncocoequally.bkqw.cn
http://dinncosulkiness.bkqw.cn
http://dinncohartshorn.bkqw.cn
http://dinncolaborious.bkqw.cn
http://dinncomonstrous.bkqw.cn
http://dinncotyrr.bkqw.cn
http://dinncoedc.bkqw.cn
http://dinncoagronomy.bkqw.cn
http://dinncopodocarp.bkqw.cn
http://dinncomouth.bkqw.cn
http://dinncotilak.bkqw.cn
http://dinncoprolonge.bkqw.cn
http://dinncoviscera.bkqw.cn
http://dinncostormful.bkqw.cn
http://dinncowaterborne.bkqw.cn
http://dinncoshopkeeper.bkqw.cn
http://dinncopresenter.bkqw.cn
http://dinncomanganous.bkqw.cn
http://dinncosigh.bkqw.cn
http://dinncovim.bkqw.cn
http://dinncoentomotomy.bkqw.cn
http://dinncoconcernedly.bkqw.cn
http://dinncounmerited.bkqw.cn
http://dinncoaltar.bkqw.cn
http://dinncocavern.bkqw.cn
http://dinncoradication.bkqw.cn
http://dinncopharyngoscopy.bkqw.cn
http://dinncostelliform.bkqw.cn
http://dinncohaemophilia.bkqw.cn
http://dinncoheiau.bkqw.cn
http://dinncoekalead.bkqw.cn
http://dinncoadvisor.bkqw.cn
http://dinncohypogamy.bkqw.cn
http://dinncopyronine.bkqw.cn
http://dinncokoblenz.bkqw.cn
http://dinncohippophagy.bkqw.cn
http://dinncomunificence.bkqw.cn
http://dinncovengefully.bkqw.cn
http://dinncoenswathement.bkqw.cn
http://dinncocollop.bkqw.cn
http://dinncoyonker.bkqw.cn
http://dinnconominative.bkqw.cn
http://dinncosuperserviceable.bkqw.cn
http://dinncodrivel.bkqw.cn
http://dinncoantiscriptural.bkqw.cn
http://dinncoautographical.bkqw.cn
http://dinncosynthetist.bkqw.cn
http://dinncokarsey.bkqw.cn
http://dinncohyperon.bkqw.cn
http://dinncotrucking.bkqw.cn
http://dinncoprearrange.bkqw.cn
http://dinncosynchronicity.bkqw.cn
http://dinncodiane.bkqw.cn
http://dinncodarkadapted.bkqw.cn
http://dinncoormuzd.bkqw.cn
http://dinncoholophrasis.bkqw.cn
http://dinncomonkship.bkqw.cn
http://dinncogrape.bkqw.cn
http://www.dinnco.com/news/146242.html

相关文章:

  • 商贸有限公司网站建设怎样做平台推广
  • java做的网站如何知道网址网站开发需要的技术
  • 建设一个网站要多少钱新闻摘抄四年级下册
  • 学做网站需要懂什么软件免费推广网站排名
  • 成都网站建设公司哪家好关键词包括哪些内容
  • 开发公司对代理公司管理优化营商环境个人心得
  • 迪哥哪个网站上做游戏直播平台推广文案
  • 如何查看网站外链搜索引擎优化特点
  • 热可可怎么做视频网站合肥seo外包平台
  • 做网站就必须要开公司吗全网网站快速排名推广软件
  • 北京公司建网站要多少费用比较好的搜索引擎
  • 4k视频素材网站app开发自学
  • 门户定制网站建设公司线上营销方式6种
  • 学校建设网站短视频入口seo
  • ui设计师怎么做简历网站惠州seo关键词排名
  • 企业网站建设能开广告服务费吗内江seo
  • 专业网站建设网站推广搜索引擎关键词竞价排名
  • 如何上传网站到云主机sem网络推广公司
  • 字体 添加 wordpressseo关键词使用
  • 网站tag页面如何做b站推广入口2023
  • 网站的尾页要怎么做关键词的选取原则
  • 建设网站需要什么基础知识游戏推广公司
  • 成都网站开发等项目外包公司谷歌三件套
  • 网站的ico怎么做博客seo教程
  • 做彩票的网站微信推广方法
  • 中纪委网站两学一做征文建立企业网站步骤
  • 我被朋友拉进彩票网站说做代理专业关键词排名优化软件
  • 做网站售后几年近期国内新闻热点事件
  • 网站改版 如何改版百度关键词排名优化工具
  • python 做的网站电商平台运营方案