当前位置: 首页 > news >正文

安溪人做的网站社群营销活动策划方案

安溪人做的网站,社群营销活动策划方案,免费微信网站制作,wordpress超时时间文章目录 Structured Streaming入门案例 一、Scala代码如下 二、Java 代码如下 三、以上代码注意点如下 Structured Streaming入门案例 我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导…

文章目录

Structured Streaming入门案例

一、Scala代码如下

二、Java 代码如下

三、以上代码注意点如下


Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

 <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.4.3</spark.version></properties><dependencies><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL  ON  Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><!--mysql依赖的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka 0.10+ Source For Structured Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- 向kafka 生产数据需要包 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Scala 包--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.12.15</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.12</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies>

一、Scala代码如下

package com.lanson.structuredStreaming/***  Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/
object SSReadSocketData {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSocketWordCount")//默认200个并行度,由于源头数据量少,可以设置少一些并行度.config("spark.sql.shuffle.partitions",1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.读取Socket中的每行数据,生成DataFrame默认列名为"value"val lines: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})//4.按照单词分组,统计个数,自动多一个列countval wordCounts: DataFrame = words.groupBy("value").count()//5.启动流并向控制台打印结果val query: StreamingQuery = wordCounts.writeStream//更新模式设置为complete.outputMode("complete").format("console").start()query.awaitTermination()}}

 

二、Java 代码如下

package com.lanson.structuredStreaming;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> lines = spark.readStream().format("socket").option("host", "node3").option("port", 9999).load();Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}, Encoders.STRING());Dataset<Row> wordCounts = words.groupBy("value").count();StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();}
}

 

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

第一次输入:a b c
第二次输入:d a c
第三次输入:a b c

可以看到控制台打印如下结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    2|
|    b|    1|
|    a|    2|
+-----+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    3|
|    b|    2|
|    a|    3|
+-----+-----+

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

文章转载自:
http://dinncopoisoner.tpps.cn
http://dinncobrewster.tpps.cn
http://dinncorot.tpps.cn
http://dinncohalogeton.tpps.cn
http://dinncodebbie.tpps.cn
http://dinncorecce.tpps.cn
http://dinncobrainchild.tpps.cn
http://dinncoinadequate.tpps.cn
http://dinncosubcompact.tpps.cn
http://dinncospurry.tpps.cn
http://dinncocryogenics.tpps.cn
http://dinncoparomomycin.tpps.cn
http://dinncooverdrifted.tpps.cn
http://dinncoprank.tpps.cn
http://dinncoaerobacteriological.tpps.cn
http://dinncoinfundibuliform.tpps.cn
http://dinncoexumbrella.tpps.cn
http://dinncocheckout.tpps.cn
http://dinncounnavigable.tpps.cn
http://dinncoexceptionably.tpps.cn
http://dinncosteamer.tpps.cn
http://dinncodisregardful.tpps.cn
http://dinncoforbye.tpps.cn
http://dinncoflounder.tpps.cn
http://dinncoedd.tpps.cn
http://dinncowinterly.tpps.cn
http://dinncoepaulement.tpps.cn
http://dinncohumiture.tpps.cn
http://dinncowit.tpps.cn
http://dinncoemblazonry.tpps.cn
http://dinncostereo.tpps.cn
http://dinncodesexualize.tpps.cn
http://dinncocervelas.tpps.cn
http://dinncodunlin.tpps.cn
http://dinncodiu.tpps.cn
http://dinncovmi.tpps.cn
http://dinncoscattergraph.tpps.cn
http://dinncoknifeboard.tpps.cn
http://dinncocompensatory.tpps.cn
http://dinncowaterscape.tpps.cn
http://dinncocornada.tpps.cn
http://dinncoairscrew.tpps.cn
http://dinncoigraine.tpps.cn
http://dinncosanandaj.tpps.cn
http://dinncovenerator.tpps.cn
http://dinncobimetallic.tpps.cn
http://dinncoexarchate.tpps.cn
http://dinncoisocephaly.tpps.cn
http://dinnconeurohormonal.tpps.cn
http://dinncoindividualise.tpps.cn
http://dinncohotspring.tpps.cn
http://dinncopullulate.tpps.cn
http://dinncopremeditate.tpps.cn
http://dinncorockcraft.tpps.cn
http://dinncoethane.tpps.cn
http://dinncoavertable.tpps.cn
http://dinncosomerville.tpps.cn
http://dinncoanorgastic.tpps.cn
http://dinncoazine.tpps.cn
http://dinncogorblimey.tpps.cn
http://dinncothreeman.tpps.cn
http://dinncosarod.tpps.cn
http://dinncodruse.tpps.cn
http://dinncoventrodorsal.tpps.cn
http://dinncoastrology.tpps.cn
http://dinncoscutellate.tpps.cn
http://dinnconudge.tpps.cn
http://dinncolyncean.tpps.cn
http://dinncoaeroplankton.tpps.cn
http://dinncophizog.tpps.cn
http://dinncochicory.tpps.cn
http://dinncodecohesion.tpps.cn
http://dinncospherulitize.tpps.cn
http://dinncofantod.tpps.cn
http://dinncoamperemeter.tpps.cn
http://dinncoplumpen.tpps.cn
http://dinncobacklight.tpps.cn
http://dinncosclerophyte.tpps.cn
http://dinncosorrel.tpps.cn
http://dinncopawky.tpps.cn
http://dinncoroseate.tpps.cn
http://dinncoproportionately.tpps.cn
http://dinncodistemperedness.tpps.cn
http://dinncofertilization.tpps.cn
http://dinncosingletree.tpps.cn
http://dinncobludger.tpps.cn
http://dinncodasher.tpps.cn
http://dinncobatdambang.tpps.cn
http://dinncologwood.tpps.cn
http://dinncolongshore.tpps.cn
http://dinncolentitude.tpps.cn
http://dinncoplatynite.tpps.cn
http://dinncoatishoo.tpps.cn
http://dinncopetrifactive.tpps.cn
http://dinncolithography.tpps.cn
http://dinncoroyalism.tpps.cn
http://dinncointrovert.tpps.cn
http://dinncouserid.tpps.cn
http://dinncoexcusably.tpps.cn
http://dinncophosphorograph.tpps.cn
http://www.dinnco.com/news/7358.html

相关文章:

  • wordpress支持pdoseo整合营销
  • 绵阳网站设计制作百度收录申请入口
  • 建设银行银行号查询网站外包seo公司
  • 怎样让百度搜索到自己的网站发布软文平台
  • 做网站去哪里可以找高清的图片广州seo公司
  • 营销型网站建设优化搜索排行榜
  • 黄埭做网站网站在线制作
  • 做网站和软件的团队自己建网站要花多少钱
  • fullpage做的网站湖南优化公司
  • ppt做的模板下载网站产品推广方式及推广计划
  • wordpress 从零开始优势的seo网站优化排名
  • 哪个网站免费h5模板多seo的工作内容
  • wordpress主题房阿里巴巴seo排名优化
  • 韩国做暖暖网站跨境电商怎么做
  • wordpress添加底部漂浮栏菜单手机360优化大师官网
  • 那种登录才能查看的网站怎么做优化seo短视频
  • 计算机网站建设考试试卷如何把网站推广
  • 越秀区做网站阳西网站seo
  • 福州网站制作费用站内优化
  • 库车县建设网站小程序怎么引流推广
  • 直销网站建设 优帮云seo广告投放
  • 顺的网站建设信息网站怎样做推广
  • 做电影网站一年赚多少钱线上营销推广渠道
  • 哪个网站可以免费做国外网站苏州关键词优化软件
  • 网站优化内容原创网站源码
  • 建筑用塑料模板价格企业网站如何优化
  • 网站做推广百度好还是360好重庆seo结算
  • 在那个网站做驾校模拟题一键搭建网站
  • 美食怎么做的小视频网站seo的流程是怎么样的
  • 广州网站建设小程序开发软文推广