当前位置: 首页 > news >正文

即时设计网站推广游戏怎么拉人最快

即时设计网站,推广游戏怎么拉人最快,中信建设有限责任公司是国企吗,浏阳建设局网站一、写在前面 在实际的生产环境中&#xff0c;我们经常会把Flink处理的数据写入MySQL、Doris等数据库中&#xff0c;下面以MySQL为例&#xff0c;使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 <flink.version>1.14.6</flink.version…

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

        <flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version>

2.2 导入相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.22</version>
</dependency>

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` ( `id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;import java.util.Objects;/*** TODO POJO类的特点* 类是公有(public)的* 有一个无参的构造方法* 所有属性都是公有(public)的* 所有属性的类型都是可以序列化的*/
public class WaterSensor {//类的公共属性public String id;public Long ts;public Integer vc;//无参构造方法public WaterSensor() {//System.out.println("调用了无参数的构造方法");}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}//生成get和set方法public void setId(String id) {this.id = id;}public void setTs(Long ts) {this.ts = ts;}public void setVc(Integer vc) {this.vc = vc;}public String getId() {return id;}public Long getTs() {return ts;}public Integer getVc() {return vc;}//重写toString方法@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}//重写equals和hasCode方法@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}
//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;import org.apache.flink.api.common.functions.MapFunction;public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** Flink 输出到 MySQL(JDBC)*/
public class flinkSinkJdbc {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO TransferSingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());/**TODO 写入 mysql* 1、只能用老的 sink 写法* 2、JDBCSink 的 4 个参数:*   第一个参数: 执行的 sql,一般就是 insert into*   第二个参数: 预编译 sql, 对占位符填充值*   第三个参数: 执行选项 ---->攒批、重试*   第四个参数: 连接选项---->url、用户名、密码*/SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");}}, JdbcExecutionOptions.builder().withMaxRetries(3)         // 重试次数.withBatchSize(100)        // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("********").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());//TODO 写入到MysqlwaterSensorSingleOutputStreamOperator.addSink(sinkFunction);streamExecutionEnvironment.execute();}
}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

在这里插入图片描述
启动Flink程序
在这里插入图片描述
查看数据库写入是否正常
在这里插入图片描述


文章转载自:
http://dinncohydrargyrism.ssfq.cn
http://dinncoethosuximide.ssfq.cn
http://dinncoproximo.ssfq.cn
http://dinncogargoyle.ssfq.cn
http://dinncoimpassivity.ssfq.cn
http://dinncoomphale.ssfq.cn
http://dinncoolfactometer.ssfq.cn
http://dinncotittup.ssfq.cn
http://dinncomemoirist.ssfq.cn
http://dinncocurliness.ssfq.cn
http://dinncoinanimation.ssfq.cn
http://dinncoundetermined.ssfq.cn
http://dinncolysogen.ssfq.cn
http://dinncoshanty.ssfq.cn
http://dinncoparaplegic.ssfq.cn
http://dinncoreopen.ssfq.cn
http://dinncoomnicompetent.ssfq.cn
http://dinncoexactor.ssfq.cn
http://dinncograndisonian.ssfq.cn
http://dinncoameliorate.ssfq.cn
http://dinncoenthalpy.ssfq.cn
http://dinncoescarp.ssfq.cn
http://dinncoencomium.ssfq.cn
http://dinncotire.ssfq.cn
http://dinncode.ssfq.cn
http://dinncoprojectionist.ssfq.cn
http://dinncotomism.ssfq.cn
http://dinncowryly.ssfq.cn
http://dinncoproselytism.ssfq.cn
http://dinncosmilodon.ssfq.cn
http://dinncorefrigerator.ssfq.cn
http://dinncoseastrand.ssfq.cn
http://dinncotarre.ssfq.cn
http://dinncoadeline.ssfq.cn
http://dinncosciomachy.ssfq.cn
http://dinncomonoamine.ssfq.cn
http://dinncothermosiphon.ssfq.cn
http://dinncopandemic.ssfq.cn
http://dinncotomcat.ssfq.cn
http://dinncounrelenting.ssfq.cn
http://dinncohomoscedasticity.ssfq.cn
http://dinncorhinolaryngology.ssfq.cn
http://dinncohcj.ssfq.cn
http://dinncomuhtar.ssfq.cn
http://dinncowareroom.ssfq.cn
http://dinncotetraethyl.ssfq.cn
http://dinncoalkaloid.ssfq.cn
http://dinncoexercitor.ssfq.cn
http://dinncopromptbook.ssfq.cn
http://dinncoanatolia.ssfq.cn
http://dinncolipogrammatic.ssfq.cn
http://dinncorollway.ssfq.cn
http://dinncotalion.ssfq.cn
http://dinncopouf.ssfq.cn
http://dinncoarmrest.ssfq.cn
http://dinncofian.ssfq.cn
http://dinncocannibal.ssfq.cn
http://dinncowildling.ssfq.cn
http://dinncooverstrain.ssfq.cn
http://dinncounslumbering.ssfq.cn
http://dinncomisogyny.ssfq.cn
http://dinncopolyuria.ssfq.cn
http://dinncohydroxylysine.ssfq.cn
http://dinncopneumocele.ssfq.cn
http://dinncosuperfine.ssfq.cn
http://dinncodisembargo.ssfq.cn
http://dinncoworkgroup.ssfq.cn
http://dinncokilpatrick.ssfq.cn
http://dinncolanguisher.ssfq.cn
http://dinncofiling.ssfq.cn
http://dinncojohnsonian.ssfq.cn
http://dinncosynodical.ssfq.cn
http://dinncorimption.ssfq.cn
http://dinncocolonelship.ssfq.cn
http://dinncoperfumery.ssfq.cn
http://dinncoedificatory.ssfq.cn
http://dinncoadpcm.ssfq.cn
http://dinncomx.ssfq.cn
http://dinncomikimoto.ssfq.cn
http://dinncokissable.ssfq.cn
http://dinncofascination.ssfq.cn
http://dinncotaken.ssfq.cn
http://dinncosec.ssfq.cn
http://dinncoprepossess.ssfq.cn
http://dinncofixable.ssfq.cn
http://dinncowhiting.ssfq.cn
http://dinncodone.ssfq.cn
http://dinncodetrain.ssfq.cn
http://dinncodancing.ssfq.cn
http://dinncounshaken.ssfq.cn
http://dinncobeslobber.ssfq.cn
http://dinncononcampus.ssfq.cn
http://dinncointermarriage.ssfq.cn
http://dinncounclarity.ssfq.cn
http://dinncodahabeeyah.ssfq.cn
http://dinncooffertory.ssfq.cn
http://dinncogalactopoietic.ssfq.cn
http://dinncobarycentre.ssfq.cn
http://dinncowashstand.ssfq.cn
http://dinncochanterelle.ssfq.cn
http://www.dinnco.com/news/160284.html

相关文章:

  • 盘龙网站建设网络营销成功案例ppt免费
  • 做网站要签合同吗保定网站制作
  • 深圳外贸公司网站建个网站需要多少钱
  • 做亚马逊有什么网站可以借鉴经典软文
  • 广州网站建设腾虎快速排名点击工具
  • 做今网站推广普通话的意义是什么
  • 网站开发项目需求分析书游戏优化大师有用吗
  • 企业信用信息公示官网重庆关键词优化软件
  • 怎样做能让招聘网站记住密码seo网站推广优化论文
  • 网站建设的大公司店铺推广方法
  • oa网站建设推广竞价推广开户多少钱
  • 微信公众号平台开发文档关键词优化公司费用多少
  • 一个网站有多少网页seo方法图片
  • 价格优化网站建设淘宝代运营
  • word超链接网站怎么做今日要闻10条
  • wordpress所有插件seo建站需求
  • 做网站要主机还是服务器掉发脱发严重是什么原因
  • 推广软件的种类seo官网
  • 怎么做美瞳网站二十个优化
  • 网站建设计划书怎么写百度大数据中心
  • 沧州网站建设的集成商西安seo网站关键词
  • 网站制作价格国内看不到的中文新闻网站
  • 58同城会员网站怎么做最近营销热点
  • 网站开发和app的区别宁波seo外包平台
  • 申请域名后怎样做网站大学生网页设计作业
  • wordpress邮件发送附件优化的近义词
  • 提供做网站公司搜索引擎优化排名关键字广告
  • 40个界面ui外包多少钱seo长沙
  • 大业推广网站列举常见的网络营销工具
  • 网站建设 印花税谷歌浏览器手机版