当前位置: 首页 > news >正文

查询类网站用什么做今天的热点新闻

查询类网站用什么做,今天的热点新闻,做网站里面内容编写,做营销网站seo这是仿真过程某图: 仿真实战kafka kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收 数据落地情况: 全部接收到并all存入mysql 下面就简单分享一下StructuredStreaming代码吧 import org.apache.spark.sql.function…

          这是仿真过程某图:

仿真场景kafkaStream
仿真实战kafka
 

 kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收

数据落地情况: 

全部接收到并all存入mysql

下面就简单分享一下StructuredStreaming代码吧

import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType,  StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}val spark: SparkSession = SparkSession.builder().appName("kafkaConsumer").master("local[3]").getOrCreate()import spark.implicits._// 定义json字段类型格式val Jsonschmea: StructType = new StructType().add("id", dataType = IntegerType).add("name", dataType = StringType).add("sorce", dataType = IntegerType)val message: DataFrame = spark.readStream // message为从kafka读到的原数据.format("kafka").option("kafka.bootstrap.servers", "xxxxx:9092,xxxx:9092,xxxx:9092").option("subscribe", "xxxx").option("startingOffsets", "latest").load()// 将json字符串转化为结构化数据val streamData: DataFrame = message.selectExpr("cast(value as String) as message") .select(from_json($"message", Jsonschmea).alias("data"))// 将json结构化为新的df// 预加载mysql驱动// 实时写入 第二个参数预占位,want给每一批次加入唯一表示, but本次仅占位没有传参数def writeToMysql(batchDF: DataFrame, epochId: Long): Unit = {val sqlurl = "jdbc:mysql://localhost:xxxx/xxxx"val sqluser = "xxxx"val sqlpass = "xxxxx"Class.forName("com.mysql.cj.jdbc.Driver")  // mysql 8.0后得驱动,旧版本去掉cjbatchDF.foreachPartition {partitionOfRecords =>val connection = DriverManager.getConnection(sqlurl, sqluser, sqlpass)// 关闭自动提交以支持增量写入connection.setAutoCommit(false)// 创建预编译的插入语句val insertsql = "insert into jsonstream(id,name,sorce) values(?,?,?)"val preparedStatement = connection.prepareStatement(insertsql)partitionOfRecords.foreach {row =>
//              val id = row.getAs[Int]("data.id")
//              val name = row.getAs[String]("data.name")
//              val score = row.getAs[Int]("data.sorce")val id = row.getAs[Row]("data").getAs[Int]("id")val name = row.getAs[Row]("data").getAs[String]("name")val sorce = row.getAs[Row]("data").getAs[Int]("sorce")// 设置参数到预处理sql函数中preparedStatement.setInt(1, id)preparedStatement.setString(2, name)preparedStatement.setInt(3, sorce)// 执行添加到批次操作preparedStatement.addBatch()}preparedStatement.executeBatch()connection.commit() // 执行批处理后手动提交事务preparedStatement.close()  // 手动GCconnection.close()}}// 数据落地到数据库streamData.writeStream.outputMode(OutputMode.Append()).foreachBatch(writeToMysql _).trigger(Trigger.ProcessingTime("1 millisecond")) // 1 毫秒每个batch.start().awaitTermination()

存储按照一定批次量做存储   

友情提示 : 上述程序是经过脱敏处理的哦

----彩蛋----

如果你看到者你会知道scala在11更新之后也就是12版本如下:

batchDF.foreachPartition {partitionOfRecords => ... 这个位置

 Dataset的foreachPartition 里面不能处理 Row的Iterator, 所以需要转为rdd在做处理

所以更改后为

batchDF.rdd.foreachPartition { partitionOfRecords => ...

而且这里不能用foreach , 否则无法序列化就能存储到mysql, 不能被序列化的数据是不能在网络中进行传输的,通过二进制流的形式传出,在被反序列化回来转化为对象的形式存储

ok -----

http://www.dinnco.com/news/70119.html

相关文章:

  • 网站用哪些系统做的好百度客服24小时电话人工服务
  • 杭州的网站建设公司有哪些荥阳seo
  • 烟台论坛建站模板快速排名方案
  • 成都艾邦视觉专业网站建设公司百度指数查询入口
  • 做垂直行业网站利润分析5g网络优化工程师
  • 个人建设网站论文app下载推广
  • 国外网站推广服务哈尔滨最新消息
  • b2b的典型网站学企业管理培训班
  • 网站首页倒计时功能怎么做b站视频推广网站2023年
  • 一键 wordpress谷歌seo需要做什么的
  • 佛山网站建设哪家专业百度云超级会员试用1天
  • 在线做ppt的网站有哪些如何找外包的销售团队
  • 儿童做的小游戏下载网站百度网站排名优化价格
  • 景洪网站建设网站营销策略有哪些
  • 怎样把自己做的网页放在网站里排名网
  • 最便宜的网站叫什么名字交换免费连接
  • 网站怎么做搜素引擎互联网销售是做什么的
  • 网站投票系统怎么做抖音关键词排名
  • 网站建设seo 视频教程地推的60种方法
  • 物流网站建设与管理规划书seo诊断书
  • 我想卖自己做的鞋子 上哪个网站好网站域名综合查询
  • 个人网站主页模板足球比赛直播
  • 淘宝客网站做app排名前十的小说
  • 济南网站制作公司排名搜索引擎优化的内部优化
  • 阿里云oss可以做网站线上推广的公司
  • 陕西网站开发价格赛雷猴是什么意思
  • 个人网站可以做百度推广吗怎么做公司网页
  • 大良营销网站建设平台优化人员配置
  • 做网站有哪些类型的seo优化自动点击软件
  • 低价格的网站建设公司邯郸网站优化公司