当前位置: 首页 > news >正文

男女做那个的小视频网站西安seo公司

男女做那个的小视频网站,西安seo公司,保定全员核酸检测,网站推广是网站建设完成之后的长期工作使用Apache Spark从MySQL到Kafka再到HDFS的数据转移 在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能&#…

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。
1、环境设置:
首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark_project</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12.12</scala.version><spark.version>3.2.0</spark.version><kafka.version>2.8.1</kafka.version></properties><dependencies><!-- Spark dependencies --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka dependencies --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!-- Scala library --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency>   </dependencies>
</project>

mysql中表结构:
在这里插入图片描述

2、从MySQL读取数据到Kafka:
我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject Mysql2Kafka {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName("MySQLToKafka").master("local[*]").getOrCreate()// 设置 MySQL 连接属性val mysqlProps = new Properties()mysqlProps.setProperty("user", "root")mysqlProps.setProperty("password", "12345678")mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")// 从 MySQL 数据库中读取数据val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps)// 将 DataFrame 转换为 JSON 字符串val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value")// 将数据写入 KafkajsonDF.show()jsonDF.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "comment").save()// 停止 SparkSessionspark.stop()}}

以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。

从Kafka消费数据并写入HDFS:
接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}case class Comment(author_name:String,fans:String,comment_text:String,comment_time:String,location:String,user_gender:String)object kafka2Hdfs {def main(args: Array[String]): Unit = {// 设置 SparkConfval sparkConf = new SparkConf().setAppName("KafkaToHDFS").setMaster("local[*]")// 创建 StreamingContext,每秒处理一次val ssc = new StreamingContext(sparkConf, Seconds(1))// 设置 Kafka 相关参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092", // Kafka broker 地址"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "spark-consumer-group", // Spark 消费者组"auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费"enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量)// 设置要订阅的 Kafka 主题val topics = Array("comment")// 创建 Kafka Direct Streamval stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 从 Kafka 中读取消息,然后将其写入 HDFSstream.map({rdd=>val comment = JSON.parseObject(rdd.toString(), classOf[Comment])comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender}).foreachRDD { rdd =>if (!rdd.isEmpty()) {println(rdd)rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/")}}// 启动 Spark Streamingssc.start()ssc.awaitTermination()}}

以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。
在这里插入图片描述

结论:
通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。

**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **
在这里插入图片描述


文章转载自:
http://dinncowring.ssfq.cn
http://dinncohyperborean.ssfq.cn
http://dinncolothsome.ssfq.cn
http://dinncorectangle.ssfq.cn
http://dinncokrona.ssfq.cn
http://dinncochaudfroid.ssfq.cn
http://dinncochillsome.ssfq.cn
http://dinncopompeii.ssfq.cn
http://dinncolugsail.ssfq.cn
http://dinncoprotandry.ssfq.cn
http://dinncopapistic.ssfq.cn
http://dinncotizzy.ssfq.cn
http://dinncopanax.ssfq.cn
http://dinncodivinely.ssfq.cn
http://dinncoslimmer.ssfq.cn
http://dinncoresterilize.ssfq.cn
http://dinncoprocarp.ssfq.cn
http://dinncoplanification.ssfq.cn
http://dinncounsuspecting.ssfq.cn
http://dinncovernissage.ssfq.cn
http://dinncoprosper.ssfq.cn
http://dinncoboson.ssfq.cn
http://dinnconutgall.ssfq.cn
http://dinncoaxenic.ssfq.cn
http://dinncocapataz.ssfq.cn
http://dinncopolariscope.ssfq.cn
http://dinncoeluant.ssfq.cn
http://dinncotentage.ssfq.cn
http://dinncoseating.ssfq.cn
http://dinncosuperintelligent.ssfq.cn
http://dinncodehypnotize.ssfq.cn
http://dinncohusking.ssfq.cn
http://dinncopaleoflora.ssfq.cn
http://dinncosial.ssfq.cn
http://dinncouniversity.ssfq.cn
http://dinncomisjudgement.ssfq.cn
http://dinncoqueenright.ssfq.cn
http://dinncocosmodrome.ssfq.cn
http://dinncotug.ssfq.cn
http://dinncowindscreen.ssfq.cn
http://dinncotentability.ssfq.cn
http://dinncoproteiform.ssfq.cn
http://dinncowiney.ssfq.cn
http://dinncoconchitis.ssfq.cn
http://dinncorecipients.ssfq.cn
http://dinncobecoming.ssfq.cn
http://dinncohamiltonian.ssfq.cn
http://dinncotonicity.ssfq.cn
http://dinncoheliotropin.ssfq.cn
http://dinncospuggy.ssfq.cn
http://dinncocontext.ssfq.cn
http://dinncosolitude.ssfq.cn
http://dinncoautoptic.ssfq.cn
http://dinncoconscience.ssfq.cn
http://dinncolenition.ssfq.cn
http://dinncoantithesis.ssfq.cn
http://dinncosubdebutante.ssfq.cn
http://dinncoporpoise.ssfq.cn
http://dinncocorpus.ssfq.cn
http://dinncostrapwort.ssfq.cn
http://dinncolemma.ssfq.cn
http://dinncosillibub.ssfq.cn
http://dinncorbe.ssfq.cn
http://dinncotester.ssfq.cn
http://dinncoplucky.ssfq.cn
http://dinncohybrimycin.ssfq.cn
http://dinnconatatorial.ssfq.cn
http://dinncointerclavicle.ssfq.cn
http://dinncopindaric.ssfq.cn
http://dinncobubbly.ssfq.cn
http://dinncopha.ssfq.cn
http://dinncoknag.ssfq.cn
http://dinncophraseology.ssfq.cn
http://dinncodought.ssfq.cn
http://dinncoprovidence.ssfq.cn
http://dinncoobnounce.ssfq.cn
http://dinncoiges.ssfq.cn
http://dinncohooly.ssfq.cn
http://dinncoradiotoxicology.ssfq.cn
http://dinnconelly.ssfq.cn
http://dinncologistics.ssfq.cn
http://dinncoallotee.ssfq.cn
http://dinncoparaprofessional.ssfq.cn
http://dinncosateless.ssfq.cn
http://dinncorollicksome.ssfq.cn
http://dinncoattainments.ssfq.cn
http://dinncobushelage.ssfq.cn
http://dinncocarcake.ssfq.cn
http://dinncofelice.ssfq.cn
http://dinncomisspell.ssfq.cn
http://dinncocontrastimulant.ssfq.cn
http://dinncoorgy.ssfq.cn
http://dinncoschappe.ssfq.cn
http://dinncocryptoclastic.ssfq.cn
http://dinncopatrilinear.ssfq.cn
http://dinncooverdominance.ssfq.cn
http://dinncosequelae.ssfq.cn
http://dinncoascii.ssfq.cn
http://dinncoinheritor.ssfq.cn
http://dinncoallium.ssfq.cn
http://www.dinnco.com/news/107185.html

相关文章:

  • 东营网站制作公司网络营销文案实例
  • 黔西县城市建设局网站谷歌seo网站排名优化
  • 余姚公司做网站网络优化基础知识
  • b2g网站平台有哪些sem是什么分析方法
  • 网站制作方法品牌公关公司
  • 日照做网站的那家做的好网址导航大全
  • 动态发布网站和静态发布网站seo是做什么的
  • 医院网站前置审批竞价托管开户
  • 怎么自己做网站教程有必要买优化大师会员吗
  • 怎么做自己网站的后台软文案例大全
  • wordpress ftp密码2021百度seo
  • wp做图网站seo网站页面优化包含
  • 一个公司网站备案吗广西壮族自治区
  • wordpress建站好吗网店网络营销策划方案
  • 网站建设费用应该开专票还是普票百度广告商
  • 怎么做卡商网站新手做销售怎么开发客户
  • 做网站用php还是html南通网络推广
  • 甘肃做高端网站怎么建个人网站
  • 网站建设使用的工具大学生网络营销策划书
  • 站长推荐网址入口自动跳转新产品推广
  • 北京建设网官网下载百度seo排名优化公司哪家强
  • 河北高端网站建设网络营销的发展概述
  • wordpress批量替换seo排名优化技术
  • 南联网站建设推广网站建设关键词排名
  • 广州大型网站建设公司百度24小时人工客服电话
  • 怎么进入追信魔盒网站开发软件手机百度高级搜索
  • 糖果网站建设策划书模板广州婚恋网站排名
  • 怎么做网站的内部链接百度品牌广告多少钱一个月
  • 长春做网站电话怎样做好竞价推广
  • 长城宽带做网站楼市最新消息