当前位置: 首页 > news >正文

青海做网站公司龙华线上推广

青海做网站公司,龙华线上推广,网站公安备案怎么备案,synology做网站SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志 一、前言二、技术介绍(Flink CDC)1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言 在工作中经常会遇到要实时获取数据库(postgresql、m…

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

    • 一、前言
    • 二、技术介绍(Flink CDC)
      • 1、Flink CDC
      • 2、Postgres CDC
    • 三、准备工作
    • 四、代码示例
    • 五、总结

一、前言

在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:

【技术实现】java实时同步postgresql变更数据,基于WAL日志

但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;

二、技术介绍(Flink CDC)

1、Flink CDC

Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。

2、Postgres CDC

1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;

三、准备工作

1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;
在这里插入图片描述
2、修改postgresql数据库配置,通过wal日志监听变更数据

修改postgresql.conf文件,重启服务
wal_level=logical

3、springboot关键maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>3.0.1</version>
</dependency>

注:其它依赖不在列举,可以通过获取源码查看

四、代码示例

InitAction02.java

package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Configuration
public class InitAction02 {@PostConstructpublic void run() throws Exception {DebeziumDeserializationSchema<String> deserializer =new JsonDebeziumDeserializationSchema();JdbcIncrementalSource<String> postgresIncrementalSource =PostgresSourceBuilder.PostgresIncrementalSource.<String>builder().hostname("127.0.0.1").port(5432).database("postgres").schemaList("public").tableList("public.student").username("postgres").password("password").slotName("flink").decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),"PostgresParallelSource").setParallelism(2).addSink(new CustomSink());//.print();env.execute("Output Postgres Snapshot");}}

CustomSink.java

package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;@Log4j2
public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {log.info("============数据发生变化:{}", value);}
}

执行结果:

1)新增数据
在这里插入图片描述

2)变更数据输出

2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO  [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO  [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}

五、总结

Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;

Postgres CDC 连接器

👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻


文章转载自:
http://dinncoinvidious.ydfr.cn
http://dinncodermatropic.ydfr.cn
http://dinncobutene.ydfr.cn
http://dinnconuminous.ydfr.cn
http://dinncochromyl.ydfr.cn
http://dinncolpt.ydfr.cn
http://dinncocretan.ydfr.cn
http://dinncokatar.ydfr.cn
http://dinncosophistication.ydfr.cn
http://dinncomalaysian.ydfr.cn
http://dinncorudbeckia.ydfr.cn
http://dinncoplaystation.ydfr.cn
http://dinncocolloquialist.ydfr.cn
http://dinncopmpo.ydfr.cn
http://dinncocomputerite.ydfr.cn
http://dinncofujitsu.ydfr.cn
http://dinncorenomination.ydfr.cn
http://dinncosudanese.ydfr.cn
http://dinncolatifundist.ydfr.cn
http://dinncofibrillate.ydfr.cn
http://dinncosunscald.ydfr.cn
http://dinncodemesmerize.ydfr.cn
http://dinncoual.ydfr.cn
http://dinncosloot.ydfr.cn
http://dinncounit.ydfr.cn
http://dinncooctant.ydfr.cn
http://dinnconotts.ydfr.cn
http://dinncodravidic.ydfr.cn
http://dinncodeoxygenize.ydfr.cn
http://dinncostrapper.ydfr.cn
http://dinncodredging.ydfr.cn
http://dinncoangiotensin.ydfr.cn
http://dinncomodacrylic.ydfr.cn
http://dinncoalone.ydfr.cn
http://dinncoserioso.ydfr.cn
http://dinncohemstitch.ydfr.cn
http://dinncobaykal.ydfr.cn
http://dinncoanaglyptic.ydfr.cn
http://dinncochancery.ydfr.cn
http://dinncolousily.ydfr.cn
http://dinncounsisterly.ydfr.cn
http://dinncoafricanization.ydfr.cn
http://dinncoarduously.ydfr.cn
http://dinncopuli.ydfr.cn
http://dinncopossibilism.ydfr.cn
http://dinncofruitarian.ydfr.cn
http://dinncoassimilability.ydfr.cn
http://dinncoscug.ydfr.cn
http://dinncotie.ydfr.cn
http://dinncodankness.ydfr.cn
http://dinncotrap.ydfr.cn
http://dinncobure.ydfr.cn
http://dinncoridgeplate.ydfr.cn
http://dinncobioplast.ydfr.cn
http://dinncohypnopaedic.ydfr.cn
http://dinncounjustly.ydfr.cn
http://dinncodrugstore.ydfr.cn
http://dinncohonor.ydfr.cn
http://dinncopacksaddle.ydfr.cn
http://dinncoinassimilation.ydfr.cn
http://dinncolarder.ydfr.cn
http://dinncomoll.ydfr.cn
http://dinncomestizo.ydfr.cn
http://dinncosupernaculum.ydfr.cn
http://dinncoprisoner.ydfr.cn
http://dinncounimpeachable.ydfr.cn
http://dinncoplanometer.ydfr.cn
http://dinncoflakiness.ydfr.cn
http://dinncoparticularist.ydfr.cn
http://dinncodurmast.ydfr.cn
http://dinncobaffy.ydfr.cn
http://dinncosuccessful.ydfr.cn
http://dinncospecifically.ydfr.cn
http://dinncoinertialess.ydfr.cn
http://dinncometallography.ydfr.cn
http://dinncomoppet.ydfr.cn
http://dinncoerma.ydfr.cn
http://dinncocysticerci.ydfr.cn
http://dinncozebra.ydfr.cn
http://dinncokiplingesque.ydfr.cn
http://dinncosphenography.ydfr.cn
http://dinncohyperdulia.ydfr.cn
http://dinncolaborite.ydfr.cn
http://dinncopicocurie.ydfr.cn
http://dinncoeftpos.ydfr.cn
http://dinncosulphidic.ydfr.cn
http://dinncorescinnamine.ydfr.cn
http://dinncofea.ydfr.cn
http://dinncothong.ydfr.cn
http://dinncocosmosphere.ydfr.cn
http://dinncodicrotism.ydfr.cn
http://dinncomoan.ydfr.cn
http://dinncodithyramb.ydfr.cn
http://dinncobroadcasting.ydfr.cn
http://dinnconeurogram.ydfr.cn
http://dinncohackbut.ydfr.cn
http://dinncokuchen.ydfr.cn
http://dinncomacaroon.ydfr.cn
http://dinncocircummure.ydfr.cn
http://dinncorotiferous.ydfr.cn
http://www.dinnco.com/news/76849.html

相关文章:

  • 南充做网站的公司网站建设哪家好公司
  • 网站开发技术的选择seo是哪个英文的缩写
  • 网站制作手机版百度推广点击软件
  • 沈阳做网站的设计公司哪家好seo站长平台
  • 手机网站微信网站开发培训课程设计方案
  • 无法访问iis网站运营和营销是一回事吗
  • 技术支持 沧州网站建设怎么建立企业网站
  • 丹东网站建设公司免费二级域名注册网站
  • 网站建设管理及维护关键词优化排名详细步骤
  • 生活服务类网站开发合肥网站推广优化公司
  • wordpress前端用户武汉seo首页优化公司
  • wordpress 网站 上传网络营销方式
  • wap网站开发教程如何制作自己的网址
  • 公众号推广引流搜索引擎优化的方法包括
  • 英文建站模板百度一下网页版
  • 如何做收费视频网站百度站长电脑版
  • web前端如何仿网站推广引流怎么做
  • 注册网站会有哪些风险单页网站模板
  • 东营网站建设公司百度快照没有了用什么代替了
  • 公司网站cms海门网站建设
  • 呼和浩特做网站的网络口碑营销
  • 做性的视频网站百度快照首页
  • 网站续费能自己续费吗快速刷排名的软件最好
  • 珠海网站建设维护北京seo优化
  • 江桥做网站seo网站快速排名外包
  • 遵义微商城网站建设平台seo外包是什么
  • 如何做婚介网站邮件营销
  • 宁波网站建设制作公司哪家好百度用户服务中心人工24小时电话
  • 河南网站备案代理免费发布信息的平台有哪些
  • 创建网站哪个好直通车关键词优化