当前位置: 首页 > news >正文

广西住房与城乡建设部网站模板建站

广西住房与城乡建设部网站,模板建站,怎样在百度做网站打广告,做论文常用网站有哪些文章目录前言一、安装部署Debezium架构部署示意图安装部署二、数据迁移Postgres迁移到PostgresMySQL迁移到PostgresSQL前言 在项目中,我们遇到已有数据库现存有大量数据,但需要将全部现存数据同步迁移到新的数据库中,我们应该如何处理呢&…

文章目录

  • 前言
  • 一、安装部署
    • Debezium架构
    • 部署示意图
    • 安装部署
  • 二、数据迁移
    • Postgres迁移到Postgres
    • MySQL迁移到PostgresSQL


前言

在项目中,我们遇到已有数据库现存有大量数据,但需要将全部现存数据同步迁移到新的数据库中,我们应该如何处理呢?

本期我们就基于Debezium与Kafka构建数据同步。


一、安装部署

Debezium架构

在这里插入图片描述
Debezium 是一个基于不同数据库中提供的变更数据捕获功能(例如,PostgreSQL中的逻辑解码)构建的分布式平台。 Debezium是通过Apache Kafka连接部署的。

Kafka Connect是一个用于实现和操作的框架运行时。

源连接器,如Debezium,它将数据摄取到Kafka中(在我们的接下来实际的例子中,Debezium将Mysql数据摄取到Kafka中);

接收连接器,它将数据从Kafka主题写入到其他到系统,这个系统可以有多种,在我们例子中,会将Kafka主题写入到PostgreSQL数据库中。

部署示意图

在这里插入图片描述

  • Zookeeper:Zookeeper容器,用于构建Kafka环境;
  • Kafka:Kafka容器,数据库的变更信息以topic的形式保存在kafka中;
  • Kafka-ui:kafka的UI页面容器,可以直观的查看kafka中的Brokers,Topics,Consumers等信息;
  • Connect:Debezium的Connect容器,对接Kafka的Connect,通过Source Connector将数据同步到Kafka中,通过Sink Connect消费Kafka的topic消息;
  • Debezium Connector:Source Connector插件,以Jar包的形式部署在Connect中,Debezium自带有MongoDB,MySQL,PostgreSQL,SQL Server,Oracle,Db2连接器;
  • DBC connector:Sink Connector插件,以Jar包的形式部署在Connect中,本次部署安装的是JDBC连接器,将Kafka上的数据同步到数据库中;
  • Debezium-ui:Debezium connect的ui页面容器。用于创建和显示Source Connector
  • Source Database:数据迁移来源方数据库。本次部署中使用的是MySQL和Postgres(10+版本);
  • Target Database:数据库迁移目标数据库。本次部署中使用的是Postgres。

安装部署

本次部署需要先安装Docker。

Debezium使用Docker安装部署,如下⬇

docker-compose.yaml


version: '2'
services:zookeeper:image: quay.io/debezium/zookeeper:2.0ports:- 2181:2181- 2888:2888- 3888:3888kafka:image: quay.io/debezium/kafka:2.0ports:- 9092:9092links:- zookeeperenvironment:- ZOOKEEPER_CONNECT=zookeeper:2181connect:image: quay.io/debezium/connect:2.0ports:- 8083:8083- 5005:5005links:- kafkaenvironment:- BOOTSTRAP_SERVERS=kafka:9092- GROUP_ID=1- CONFIG_STORAGE_TOPIC=my_connect_configs- OFFSET_STORAGE_TOPIC=my_connect_offsets- STATUS_STORAGE_TOPIC=my_source_connect_statuseskafka-ui:image: provectuslabs/kafka-ui:latestports:- "9093:8080"environment:- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092links:- kafkadebezium-ui:image: debezium/debezium-ui:2.0ports:- "8080:8080"environment:- KAFKA_CONNECT_URIS=http://connect:8083links:- connect

部署命令:

docker-compose -f docker-compose.yaml -p debezium up -d

部署完成后,Docker容器列表,如下:

在这里插入图片描述

  • Kafka-ui访问地址:http://localhost:9093

  • Debezium-ui访问地址:http://localhost:8080

Source Connector和Sink Connector都是以JAR包的方式,存在于Connect容器的/kafka/connect目录下。

Connect容器自带有Debezium的官方Source Connector:

  • debezium-connector-db2
  • debezium-connector-mysql
  • debezium-connector-postgres
  • debezium-connector-vitess
  • debezium-connector-mongodb
  • debezium-connector-oracle
  • debezium-connector-sqlserver

需要自行注册Sink Connector:Kafka-Connect-JDBC(新建Kafka-Connect-JDBC目录,下载JAR包放入此目录,重启Conenct)。

注册Sink Connector

# docker容器中新建kafka-connect-jdbc目录
docker exec 容器id mkdir /kafka/connect/kafka-connect-jdbc
# 下载jar包到本地
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/5.3.2/kafka-connect-jdbc-5.3.2.jar
# 拷贝jar包到docker容器
docker cp kafka-connect-jdbc-5.3.2.jar 容器id:/kafka/connect/kafka-connect-jdbc
# 重启connect容器
docker restart 容器id

二、数据迁移

在这里插入图片描述

数据迁移经历以下几个步骤:

1)启动源数据库;

2)注册Source Connector,Source Connector监听Source Database的数据变动,发布数据到Kafka的Topic中,一个表对应一个Topic,Topic中包含对表中某条记录的某个操作(新增,修改,删除等);

3)启动目标数据库;

4)注册Sink Connector,Sink Connector消费Kafka中的Topic,通过JDBC连接到Target Database,根据Topic中的信息,对表记录执行对应操作。

Postgres迁移到Postgres

  • 1.启动源数据库-Postgres

本次部署通过容器的方式启动:

docker run -d --name source-postgres -p 15432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_USER=debe postgres:12.6
  • 2.注册Source Connecto

通过Debezium UI页面进行注册。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
需要注意的有以下几点:

Debezium Postgres类型的Source Connector支持的Postgres需要将wal_level修改为logical;修改Postgres中的Postgresql.conf文件中的配置(wal_level = logical)并重启Postgres;
Postgres需要支持解码插件,Debezium官方一共提供了两个解码插件:

Decoderbufs:Debezium默认配置,由Debezium维护;
Pgoutput:Postgres 10+版本自带;使用此插件时,需要配置plugin.name=pgoutput

  • 3.启动目标数据库-Postgre
docker run -d --name target-postgres -p 25432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_USER=debe postgres:12.6
  • 4.注册Sink Connector

通过Connect提供的API进行注册

新增Connector


curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d \
'{"name": "sink-connector-postgres","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "postgres.public.test_source","connection.url": "jdbc:postgresql://10.3.73.160:25432/postgres?user=debe&password=123456","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.create": "true","insert.mode": "upsert","delete.enabled": "true","pk.fields": "id","pk.mode": "record_key"}
}'
  • 5.验证数据迁移过程

源数据库中的表数据迁移到Kafka

新建表test_source和test_source1

test_source&test_source1.sql

-- test_source
create table if not exists public.test_source
(id   integer not nullconstraint test_source_pkprimary key,name varchar(64)
);alter table public.test_sourceowner to debe;insert into public.test_source (id, name) values (1, 'a');
-- test_source1
create table if not exists public.test_source1
(id   integer not nullconstraint test_source1_pkprimary key,name varchar(64)
);alter table public.test_source1owner to debe;insert into public.test_source1 (id, name) values (1, 'a1');

Kafka新建数据前 ⬇

在这里插入图片描述
Kafka新建数据后 ⬇

在这里插入图片描述
在这里插入图片描述

源数据库中新建表test_source和表test_source1后,Kafka中出现了两个Topic:

postgres.public.test_source和postgres.public.test_source1,与这两个表一一对应,topic中的message对应着对表中记录的操作(新增1条记录)。

监听的表可通过连接器配置进行过滤,比如配置"table.include.list": “public.test_source”,就只会出现一个Topic:postgres.public.test_source

Kafka中的数据迁移到目标数据库

在这里插入图片描述
在这里插入图片描述

注册Sink Connector后,Kafka中会新增一个Customer,对postgres.public.test_source进行消费(sink connector配置中的"topics": "postgres.public.test_source"指定);

对应的源数据库(sink connector配置中的"connection.url": "jdbc:postgresql://10.3.73.160:25432/postgres?user=debe&password=123456"指定)会新增一个表public.test_source,该表中的数据和源数据库中的public.test_source始终保持同步。

MySQL迁移到PostgresSQL

  • 1.启动源数据库-mysql

本次部署通过docker启动:

docker run -d --name source-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.0
  • 2.注册Source Connector

启动MySQL数据源连接注册

注册MySQL数据源有两种方式:

1、在Debezium UI中直接添加
2、调用Kafka API 注册

在Debezium UI中直接添加
在这里插入图片描述
选择MySQL数据源

在这里插入图片描述
调用Kafka API注册

新增Connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d \
'{"name": "inventory-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","topic.prefix": "dbserver1","database.hostname": "mysql","database.port": "3306","database.user": "debezium", //数据库用户名"database.password": "dbz",  //数据库密码"database.server.id": "184054","database.include.list": "inventory",  //数据源覆盖范围"schema.history.internal.kafka.bootstrap.servers": "kafka:9092","schema.history.internal.kafka.topic": "schema-changes.inventory","transforms": "route","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement": "$3"}
}'

在这里插入图片描述

验证Source Connector注册结果

注册连接前:

在这里插入图片描述
注册连接后:

在这里插入图片描述
多出来的Topics信息是MySQL source表信息,连接MySQL数据库可见表:

在这里插入图片描述
在这里插入图片描述
UI for Apache Kafka中可以看到Messages同步信息。

在这里插入图片描述
访问Debezium UI(http://localhost:8080/ )可以看到MySQL的连接。

在这里插入图片描述

  • 3.启动目标数据库-Postgres

本次部署采用Docker方式启动:

docker run -d --name target-postgres -p 5432:5432  -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=postgrespw -e POSTGRES_DB=inventory debezium/postgres:9.6
  • 4.注册Sink Connector (通过API接口)

新增Connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d \
'{"name": "jdbc-sink","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "customers", //迁移目标主题(这里是按照表来订阅的)"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.create": "true","insert.mode": "upsert","delete.enabled": "true","pk.fields": "id","pk.mode": "record_key"}
}'

在这里插入图片描述

注册PostgreSQL connector后,不会在Debezium中显示Connector client 信息,但可以在UI for Apache Kafka中看到:

在这里插入图片描述

  • 5.验证数据迁移过程

完成安装步骤后,以Customers表为例,做CUD操作语句,实现MySQL数据库同步数据到PostgreSQL 。

Mysql 数据库现有数据:

在这里插入图片描述
在这里插入图片描述
手动在MySQL数据库Customers表中添加一条数据 ⬇

customers.sql

insert into customers(id,first_name,last_name,email) values(1005,'test','one','123456@qq.com');

在这里插入图片描述
在PostgreSQL数据库中Customers多出一条数据:

在这里插入图片描述

Kafka中Messages新增一条数据,完成数据同步:

在这里插入图片描述
可以看到消费如下信息:

topics-customers.json


{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "dbserver1.inventory.customers.Value","field": "before"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "dbserver1.inventory.customers.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "string","optional": true,"name": "io.debezium.data.Enum","version": 1,"parameters": {"allowed": "true,last,false,incremental"},"default": "false","field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "sequence"},{"type": "string","optional": true,"field": "table"},{"type": "int64","optional": false,"field": "server_id"},{"type": "string","optional": true,"field": "gtid"},{"type": "string","optional": false,"field": "file"},{"type": "int64","optional": false,"field": "pos"},{"type": "int32","optional": false,"field": "row"},{"type": "int64","optional": true,"field": "thread"},{"type": "string","optional": true,"field": "query"}],"optional": false,"name": "io.debezium.connector.mysql.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "id"},{"type": "int64","optional": false,"field": "total_order"},{"type": "int64","optional": false,"field": "data_collection_order"}],"optional": true,"name": "event.block","version": 1,"field": "transaction"}],"optional": false,"name": "dbserver1.inventory.customers.Envelope","version": 1},"payload": {"before": null,"after": {"id": 1005,"first_name": "test","last_name": "one","email": "123456@qq.com"},"source": {"version": "2.0.1.Final","connector": "mysql","name": "dbserver1","ts_ms": 1672024796000,"snapshot": "false","db": "inventory","sequence": null,"table": "customers","server_id": 223344,"gtid": null,"file": "mysql-bin.000003","pos": 392,"row": 0,"thread": 16,"query": null},"op": "c","ts_ms": 1672024796396,"transaction": null}
}

重要的部分是 “payload” json 中信息:

  • source 中会展示“版本”,“数据源”等信息;
  • after 代表变动信息;
  • “op” 操作信息,例如“c” 代表创建;

需要注意的是,结果的json格式是Debezium定义好的格式。

Debezium json格式通常前面定义Schema信息,最后才是实际的载荷(payload)信息。

详细格式定义可以查看:https://debezium.io/documentation/reference/1.6/connectors/mysql.html

通过以上步骤,我们在Docker环境上使用Debezium实现了数据同步到kafaka。本期关于数据同步迁移的内容就到这里了,建议大家收藏学习!~


文章转载自:
http://dinncolagos.ssfq.cn
http://dinncoscrambler.ssfq.cn
http://dinncosmattery.ssfq.cn
http://dinncodooly.ssfq.cn
http://dinncopostpituitary.ssfq.cn
http://dinnconearsighted.ssfq.cn
http://dinncochrominance.ssfq.cn
http://dinncoinhumane.ssfq.cn
http://dinncoobconic.ssfq.cn
http://dinncodelicious.ssfq.cn
http://dinncocomplicity.ssfq.cn
http://dinncoconsummative.ssfq.cn
http://dinnconigerien.ssfq.cn
http://dinncosemiplastic.ssfq.cn
http://dinncomosotho.ssfq.cn
http://dinncoblether.ssfq.cn
http://dinncoplatform.ssfq.cn
http://dinncocheth.ssfq.cn
http://dinncotelepathist.ssfq.cn
http://dinncomergence.ssfq.cn
http://dinncowashboard.ssfq.cn
http://dinncohodeida.ssfq.cn
http://dinncobib.ssfq.cn
http://dinncoandromonoecism.ssfq.cn
http://dinncoheptastylos.ssfq.cn
http://dinncodumbbell.ssfq.cn
http://dinncosilkaline.ssfq.cn
http://dinncocromer.ssfq.cn
http://dinncomegamachine.ssfq.cn
http://dinncotrapper.ssfq.cn
http://dinncomarron.ssfq.cn
http://dinncoangry.ssfq.cn
http://dinncohaggardness.ssfq.cn
http://dinncocalypsonian.ssfq.cn
http://dinncophyllophagous.ssfq.cn
http://dinncoinductor.ssfq.cn
http://dinncoclavicorn.ssfq.cn
http://dinncocanalize.ssfq.cn
http://dinncometeorograph.ssfq.cn
http://dinncosomnambule.ssfq.cn
http://dinncoexophoria.ssfq.cn
http://dinncoacquiescently.ssfq.cn
http://dinncoaccidentproof.ssfq.cn
http://dinncostreetlight.ssfq.cn
http://dinncoassessee.ssfq.cn
http://dinncowaiver.ssfq.cn
http://dinncofloorer.ssfq.cn
http://dinncoshm.ssfq.cn
http://dinncopeevit.ssfq.cn
http://dinncohaven.ssfq.cn
http://dinncoconfluence.ssfq.cn
http://dinncorenata.ssfq.cn
http://dinncounderstrength.ssfq.cn
http://dinncocounterchange.ssfq.cn
http://dinncohomy.ssfq.cn
http://dinncowhitish.ssfq.cn
http://dinncoanthropophagus.ssfq.cn
http://dinncoclaudicant.ssfq.cn
http://dinncostartle.ssfq.cn
http://dinncohoary.ssfq.cn
http://dinncobeetle.ssfq.cn
http://dinncospilth.ssfq.cn
http://dinncoviniferous.ssfq.cn
http://dinncoamphictyony.ssfq.cn
http://dinncobattercake.ssfq.cn
http://dinncostatehood.ssfq.cn
http://dinncoadditivity.ssfq.cn
http://dinncoboater.ssfq.cn
http://dinncoparsi.ssfq.cn
http://dinncodragbar.ssfq.cn
http://dinncomastectomy.ssfq.cn
http://dinncoquiddle.ssfq.cn
http://dinncopapaverous.ssfq.cn
http://dinncowanda.ssfq.cn
http://dinncotrilobed.ssfq.cn
http://dinncoembraceor.ssfq.cn
http://dinnconanoprogramming.ssfq.cn
http://dinncobackslapper.ssfq.cn
http://dinncoexplodent.ssfq.cn
http://dinncoleeward.ssfq.cn
http://dinncosymbiose.ssfq.cn
http://dinncostaphylotomy.ssfq.cn
http://dinncotalkative.ssfq.cn
http://dinncocantor.ssfq.cn
http://dinncolocal.ssfq.cn
http://dinncolibertinage.ssfq.cn
http://dinncomumm.ssfq.cn
http://dinncobpa.ssfq.cn
http://dinncoegp.ssfq.cn
http://dinncohieratic.ssfq.cn
http://dinncointrorse.ssfq.cn
http://dinncocomfortlessly.ssfq.cn
http://dinncoraggle.ssfq.cn
http://dinncowarmonger.ssfq.cn
http://dinnconervate.ssfq.cn
http://dinncosuccous.ssfq.cn
http://dinncoanalyzer.ssfq.cn
http://dinncodevadasi.ssfq.cn
http://dinncobeachhead.ssfq.cn
http://dinncojeremias.ssfq.cn
http://www.dinnco.com/news/143418.html

相关文章:

  • 怎样黑网站seo推广和百度推广的区别
  • 网站建设网络推广首选公司磁力搜索引擎2023
  • 软件开发者是指旺道seo系统
  • wordpress科技模板超级优化空间
  • 苏州快速建设网站公司关键词优化推广排名软件
  • 做电商网站公司简介电工培训学校
  • 广州网站制百度推广网页版
  • 模拟人生4做游戏下载网站电工培训学校
  • 专注聊城做网站的公司百度识图鉴你所见
  • 网站建设属于软件开发长沙seo
  • wordpress 分享到优帮云排名优化
  • 佛山seo整站优化网络营销的优势和劣势
  • 广告网站设计怎么样广告推广公司
  • 做电影资源网站违法吗百度收录查询接口
  • 51testing培训怎么样网站优化推广价格
  • 网站开发 成都长沙seo优化价格
  • 网站建设详细教程视频郑州seo排名工具
  • 怎样做网站优化公司网站seo公司
  • 点击app图标进入网站怎么做什么企业需要网络营销和网络推广
  • 网站建设公司发展前景域名查询万网
  • 武汉职业技术学院乐山网站seo
  • 外包网站制作营销推广活动策划方案
  • 做视频网站视频加载过慢冯耀宗seo视频教程
  • 苏州公司网站开发百度识图扫一扫
  • 制作个人业务网站查询关键词排名软件
  • wordpress流媒体插件广告投放优化师
  • h5免费模板网站广州seo网络营销培训
  • 博客网站开发报告文库seo查询网站是什么
  • 凡科的网站怎么仿销售找客户的app
  • 网站空间域名维护协议白山seo