Dinky+FlinkSQL单机部署实现CDC数据全增量同步

Dinky 为 Apache Flink 而生,让 Flink SQL 纵享丝滑。本篇含MySQL到MySQL整库同步示例。

一站式 FlinkSQL & SQL DataOps
基于 Apache Flink 二次开发,无侵入,开箱即用
实时即未来,批流为一体

http://www.dlink.top/

书接上篇,我们成功搭建了Zeppelin的FlinkSQL环境。这篇来讲下Dinky这个WebUI框架部署和使用,不得不说确实好用👍。

  • Java 8
  • Flink v1.13.6
  • Dinky v0.6.6

一、Flink单节点集群

Flink这次使用的是1.13.6,解包启动单节点集群,默认使用8081端口

# 启动集群
./bin/start-cluster.sh
# 停止集群
./bin/stop-cluster.sh

检查是否启动

tail log/flink-*-standalonesession-*.log

二、Dinky配置

官方部署文档:Dinky 简介 | Dinky (dlink.top)

其实已经非常详细了,这里在简单说下环境的配置过程。

1.下载解包:

wget https://github.com/DataLinkDC/dlink/releases/download/v0.6.6/dlink-release-0.6.6.tar.gz
tar -zxvf dlink-release-0.6.6.tar.gz
mv dlink-release-0.6.6 dlink
cd dlink

2.初始化数据库

MySQL8要求先创建用户,再授权。mysql7看官网即可

mysql> create user 'dlink'@'%' identified by 'dlink';
mysql> grant all privileges on *.* to 'dlink'@'%' with grant option;
mysql> flush privileges;
# 此处用 dlink 用户登录
mysql -h fdw1  -udlink -pdlink
mysql> create database dlink;
# 导入
mysql> use dlink;
mysql> source /path/to/dlink/sql/dlink.sql

可以自定义数据库连接信息

# 切换目录
cd /path/to/dlink/config/
vim application.yml

3.加载依赖(重要)

很多问题都会卡在这一步

这里建议先把Flink都配置好,然后把Flink的依赖包jar直接拷贝到Dinky的plugins里即可

可以跳到本页:【四、实操 – 整库同步 1.依赖包jar】 配置依赖 ,再执行如下命令

mkdir -p /path/to/dlink/plugins
cp /path/to/flink1.13.6/lib/* /path/to/dlink/plugins

如下是我本地的依赖(没有的去maven找即可):

flink-sql-connector-mysql-cdc依赖:Releases · ververica/flink-cdc-connectors (github.com)

flink-connector-jdbcMaven Repository: org.apache.flink » flink-connector-jdbc (mvnrepository.com)
mysql-connector-javaMaven Repository: mysql » mysql-connector-java (mvnrepository.com)

plugins/
├── flink-connector-jdbc_2.11-1.13.6.jar
├── flink-csv-1.13.6.jar
├── flink-dist_2.11-1.13.6.jar
├── flink-json-1.13.6.jar
├── flink-shaded-zookeeper-3.4.14.jar
├── flink-sql-connector-mysql-cdc-2.2.1.jar
├── flink-table_2.11-1.13.6.jar
├── flink-table-blink_2.11-1.13.6.jar
├── log4j-1.2-api-2.17.1.jar
├── log4j-api-2.17.1.jar
├── log4j-core-2.17.1.jar
├── log4j-slf4j-impl-2.17.1.jar
└── mysql-connector-java-8.0.29.jar

4.启动 Dinky

默认启用8888端口访问

cd /path/to/dlink
# 启动
$sh auto.sh start
# 停止
$sh auto.sh stop
# 重启
$sh auto.sh restart
# 查看状态
$sh auto.sh status

三、实操 – 单表同步

1.注册集群

页面URI::8888/registration/cluster/clusterInstance

登录Dinky,选择 注册中心 -> 集群管理 -> 集群实例管理 -> 添加

我们使用本地集群进行演示:类型选择 Standalone (单节点集群)模式,地址:127.0.0.1:8081把我们第一步启动的集群地址添加上去。

2.编写作业

选择 数据开发 -> 左侧目录 -> 创建作业

作业类型 选择 FlinkSql,名称自己定义

以下为MySQL到MySQL单表进行CDC的流处理实例代码:

-- Flink SQL  开启心跳检查
SET execution.checkpointing.interval = 3s;

-- 源表-日志表,使用mysql-cdc连接器
DROP TABLE IF EXISTS source_log;
CREATE TABLE IF NOT EXISTS source_log (
  `id` INT NOT NULL,
  `add_time` TIMESTAMP COMMENT '消费时间',
  `data` STRING COMMENT '数据标识',
  PRIMARY KEY (`id`) NOT ENFORCED
) COMMENT '记录表' WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'flink',
  'table-name' = 'log'
);

-- 目标表,使用jdbc连接器
DROP TABLE IF EXISTS log;
CREATE TABLE IF NOT EXISTS log (
  `id` INT NOT NULL,
  `add_time` TIMESTAMP COMMENT '消费时间',
  `data` STRING COMMENT '数据标识',
  PRIMARY KEY (`id`) NOT ENFORCED
) COMMENT '记录表' WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://127.0.0.1:3306/mydb',
  'username' = 'root',
  'password' = '123456',
  'table-name' = 'log'
);

-- 执行同步
INSERT INTO log SELECT * FROM source_log;

更多示例代码参加FlinkCDC官网:快速上手 — CDC Connectors for Apache Flink® documentation (ververica.github.io)

3.执行发布

在页面的工具栏执行作业发布,上线。

上线后即可在运维中心模块看到任务的执行情况。

四、实操 – 整库同步

Dinky 定义了 CDCSOURCE 整库同步的语法

官方介绍及文档:CDCSOURCE 整库同步 | Dinky (dlink.top)

1.依赖包jar

引入包后记得重启集群!

# 将下面 Dinky 根目录下 整库同步依赖包放置 $FLINK_HOME/lib下
jar/dlink-client-base-${version}.jar
jar/dlink-common-${version}.jar
lib/dlink-client-${version}.jar

2.作业示例

步骤三一致,这里提供MySQL到MySQL的整库同步作业代码示例:

感谢 群友 @星空的沉默 测试提供

EXECUTE CDCSOURCE jobname_sync_all WITH (
  -- 心跳检查 ms (毫秒)
  'checkpoint' = '3000',
  -- CDC启动模式:initial(先全量再增量),latest-offset(直接按最近binlog增量)
  'scan.startup.mode' = 'initial',
  -- 任务并行度
  'parallelism' = '1',

  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  -- table-name 参数支持正则
  'table-name' = 'flink\.orders,flink\.items',

  'sink.connector' = 'jdbc',
  'sink.url' = 'jdbc:mysql://127.0.0.1:3306/mydb',
  'sink.username' = 'root',
  'sink.password' = '123456',
  'sink.sink.db' = 'mydb'
  -- 注入原表名${tableName}
  'sink.table-name' = '${tableName}',
);

更多整库同步示例:CDCSOURCE 整库同步 | Dinky (dlink.top)

结语:

希望社区发展越来越好,为开源人加油

附:文末大佬的视频讲解:FlinkSQL开发平台——Dinky0.5入门使用_哔哩哔哩_bilibili

参考文章:

Flink CDC 系列(5)—— Flink CDC MySQL Connector 启动模式_白月蓝山的博客-CSDN博客_latest-offset

关联文章:

Author: thinkwei

4 thoughts on “Dinky+FlinkSQL单机部署实现CDC数据全增量同步

  1. 已经添加群了;
    同样的配置用mysql是没有问题的;
    我将链接信息换成
    ‘connector’ = ‘jdbc’,
    ‘url’ = ‘jdbc:postgresql://192.188.10.44:6432/datacube’,
    ‘username’ = ‘root’,
    ‘password’ = ‘root’,
    ‘table-name’ = ‘test’
    这样有没有问题

  2. java.sql.BatchUpdateException: Batch entry 0 INSERT INTO student_1(Sno, name, gender, age, Sdept, UpdateAt) VALUES (1, ‘1’, ‘1’, 1, ‘1’, ‘2022-12-22 15:20:03+08’) ON CONFLICT (Sno) DO UPDATE SET Sno=EXCLUDED.Sno, name=EXCLUDED.name, gender=EXCLUDED.gender, age=EXCLUDED.age, Sdept=EXCLUDED.Sdept, UpdateAt=EXCLUDED.UpdateAt was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
    Caused by: org.postgresql.util.PSQLException: ERROR: modification of distribution columns in OnConflictUpdate is not supported
    截图可能更清晰一些,但是不能发截图
    可以qq1461940404

    1. 看报错像是批量更新不支持,建议调整下相关配置:UpdateAt was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
      官方群里氛围挺好,建议加入Q群:543709668

  3. 将sink数据库换成postgresql后出现如下错误
    Caused by: org.postgresql.util.PSQLException: ERROR: modification of distribution columns in OnConflictUpdate is not supported

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注