Dinky 为 Apache Flink 而生,让 Flink SQL 纵享丝滑。本篇含MySQL到MySQL整库同步示例。
一站式 FlinkSQL & SQL DataOps
http://www.dlink.top/
基于 Apache Flink 二次开发,无侵入,开箱即用
实时即未来,批流为一体
书接上篇,我们成功搭建了Zeppelin的FlinkSQL环境。这篇来讲下Dinky这个WebUI框架部署和使用,不得不说确实好用。
- Java 8
- Flink v1.13.6
- Dinky v0.6.6
一、Flink单节点集群
Flink这次使用的是1.13.6,解包启动单节点集群,默认使用8081端口
# 启动集群
./bin/start-cluster.sh
# 停止集群
./bin/stop-cluster.sh
检查是否启动
tail log/flink-*-standalonesession-*.log
二、Dinky配置
官方部署文档:Dinky 简介 | Dinky (dlink.top)
其实已经非常详细了,这里在简单说下环境的配置过程。
1.下载解包:
wget https://github.com/DataLinkDC/dlink/releases/download/v0.6.6/dlink-release-0.6.6.tar.gz
tar -zxvf dlink-release-0.6.6.tar.gz
mv dlink-release-0.6.6 dlink
cd dlink
2.初始化数据库
MySQL8要求先创建用户,再授权。mysql7看官网即可
mysql> create user 'dlink'@'%' identified by 'dlink';
mysql> grant all privileges on *.* to 'dlink'@'%' with grant option;
mysql> flush privileges;
# 此处用 dlink 用户登录
mysql -h fdw1 -udlink -pdlink
mysql> create database dlink;
# 导入
mysql> use dlink;
mysql> source /path/to/dlink/sql/dlink.sql
可以自定义数据库连接信息
# 切换目录
cd /path/to/dlink/config/
vim application.yml
3.加载依赖(重要)
很多问题都会卡在这一步
这里建议先把Flink都配置好,然后把Flink的依赖包jar直接拷贝到Dinky的plugins里即可
可以跳到本页:【四、实操 – 整库同步 1.依赖包jar】 配置依赖 ,再执行如下命令
mkdir -p /path/to/dlink/plugins
cp /path/to/flink1.13.6/lib/* /path/to/dlink/plugins
如下是我本地的依赖(没有的去maven找即可):
flink-sql-connector-mysql-cdc
依赖:Releases · ververica/flink-cdc-connectors (github.com)
flink-connector-jdbc
:Maven Repository: org.apache.flink » flink-connector-jdbc (mvnrepository.com)mysql-connector-java
:Maven Repository: mysql » mysql-connector-java (mvnrepository.com)
plugins/
├── flink-connector-jdbc_2.11-1.13.6.jar
├── flink-csv-1.13.6.jar
├── flink-dist_2.11-1.13.6.jar
├── flink-json-1.13.6.jar
├── flink-shaded-zookeeper-3.4.14.jar
├── flink-sql-connector-mysql-cdc-2.2.1.jar
├── flink-table_2.11-1.13.6.jar
├── flink-table-blink_2.11-1.13.6.jar
├── log4j-1.2-api-2.17.1.jar
├── log4j-api-2.17.1.jar
├── log4j-core-2.17.1.jar
├── log4j-slf4j-impl-2.17.1.jar
└── mysql-connector-java-8.0.29.jar
4.启动 Dinky
默认启用8888端口访问
cd /path/to/dlink
# 启动
$sh auto.sh start
# 停止
$sh auto.sh stop
# 重启
$sh auto.sh restart
# 查看状态
$sh auto.sh status
三、实操 – 单表同步
1.注册集群
页面URI::8888/registration/cluster/clusterInstance
登录Dinky,选择 注册中心 -> 集群管理 -> 集群实例管理 -> 添加
我们使用本地集群进行演示:类型选择 Standalone
(单节点集群)模式,地址:127.0.0.1:8081
把我们第一步启动的集群地址添加上去。
2.编写作业
选择 数据开发 -> 左侧目录 -> 创建作业
作业类型 选择 FlinkSql,名称自己定义
以下为MySQL到MySQL单表进行CDC的流处理实例代码:
-- Flink SQL 开启心跳检查
SET execution.checkpointing.interval = 3s;
-- 源表-日志表,使用mysql-cdc连接器
DROP TABLE IF EXISTS source_log;
CREATE TABLE IF NOT EXISTS source_log (
`id` INT NOT NULL,
`add_time` TIMESTAMP COMMENT '消费时间',
`data` STRING COMMENT '数据标识',
PRIMARY KEY (`id`) NOT ENFORCED
) COMMENT '记录表' WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'flink',
'table-name' = 'log'
);
-- 目标表,使用jdbc连接器
DROP TABLE IF EXISTS log;
CREATE TABLE IF NOT EXISTS log (
`id` INT NOT NULL,
`add_time` TIMESTAMP COMMENT '消费时间',
`data` STRING COMMENT '数据标识',
PRIMARY KEY (`id`) NOT ENFORCED
) COMMENT '记录表' WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/mydb',
'username' = 'root',
'password' = '123456',
'table-name' = 'log'
);
-- 执行同步
INSERT INTO log SELECT * FROM source_log;
更多示例代码参加FlinkCDC官网:快速上手 — CDC Connectors for Apache Flink® documentation (ververica.github.io)
3.执行发布
在页面的工具栏执行作业发布,上线。
上线后即可在运维中心模块看到任务的执行情况。
四、实操 – 整库同步
Dinky 定义了 CDCSOURCE 整库同步的语法
官方介绍及文档:CDCSOURCE 整库同步 | Dinky (dlink.top)
1.依赖包jar
引入包后记得重启集群!
# 将下面 Dinky 根目录下 整库同步依赖包放置 $FLINK_HOME/lib下
jar/dlink-client-base-${version}.jar
jar/dlink-common-${version}.jar
lib/dlink-client-${version}.jar
2.作业示例
同步骤三一致,这里提供MySQL到MySQL的整库同步作业代码示例:
感谢 群友 @星空的沉默 测试提供
EXECUTE CDCSOURCE jobname_sync_all WITH (
-- 心跳检查 ms (毫秒)
'checkpoint' = '3000',
-- CDC启动模式:initial(先全量再增量),latest-offset(直接按最近binlog增量)
'scan.startup.mode' = 'initial',
-- 任务并行度
'parallelism' = '1',
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
-- table-name 参数支持正则
'table-name' = 'flink\.orders,flink\.items',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://127.0.0.1:3306/mydb',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.sink.db' = 'mydb'
-- 注入原表名${tableName}
'sink.table-name' = '${tableName}',
);
更多整库同步示例:CDCSOURCE 整库同步 | Dinky (dlink.top)
结语:
希望社区发展越来越好,为开源人加油
附:文末大佬的视频讲解:FlinkSQL开发平台——Dinky0.5入门使用_哔哩哔哩_bilibili
参考文章:
Flink CDC 系列(5)—— Flink CDC MySQL Connector 启动模式_白月蓝山的博客-CSDN博客_latest-offset
关联文章:
已经添加群了;
同样的配置用mysql是没有问题的;
我将链接信息换成
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:postgresql://192.188.10.44:6432/datacube’,
‘username’ = ‘root’,
‘password’ = ‘root’,
‘table-name’ = ‘test’
这样有没有问题
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO student_1(Sno, name, gender, age, Sdept, UpdateAt) VALUES (1, ‘1’, ‘1’, 1, ‘1’, ‘2022-12-22 15:20:03+08’) ON CONFLICT (Sno) DO UPDATE SET Sno=EXCLUDED.Sno, name=EXCLUDED.name, gender=EXCLUDED.gender, age=EXCLUDED.age, Sdept=EXCLUDED.Sdept, UpdateAt=EXCLUDED.UpdateAt was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
Caused by: org.postgresql.util.PSQLException: ERROR: modification of distribution columns in OnConflictUpdate is not supported
截图可能更清晰一些,但是不能发截图
可以qq1461940404
看报错像是批量更新不支持,建议调整下相关配置:UpdateAt was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
官方群里氛围挺好,建议加入Q群:543709668
将sink数据库换成postgresql后出现如下错误
Caused by: org.postgresql.util.PSQLException: ERROR: modification of distribution columns in OnConflictUpdate is not supported