一、简要说明
canal的使用需要一个服务端 deployer
和客户端 adapter
。简单来说,服务端来监听源数据库的bin-log
变化并解析为 sql 等待客户端消费;客户端连接服务端来进行sql消费。
二、配置实现
参考官方文档:QuickStart · alibaba/canal Wiki (github.com),多看,多实践
1.准备工作
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 连接 MySQL 的账号具有作为 MySQL slave 的权限(SELECT, REPLICATION SLAVE, REPLICATION CLIENT)
2.配置服务端
下载服务端包:canal.deployer-1.1.5.tar.gz
进入文件 conf/example/instance.properties
,如下修改,其他保持默认即可。
# position info 这里设置源数据库地址(可选增量信息)
canal.instance.master.address=localhost:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# ... 中间省略
# username/password 这里设置源数据库用户名、密码、编码格式
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
启动服务端
sh bin/startup.sh
window下双击 startup.bat
3.配置客户端
下载安装包:canal.adapter-1.1.5.tar.gz
这里客户端启动后会提供REST接口,以供查看监控客户端运行状态
ClientAdapter · alibaba/canal Wiki (github.com)
总配置文件 conf/application.yml
,根据需要配置(这里以MySQL 到 MySQL同步为例),其他保持默认即可
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
#... 中间省略
# 数据源配置
srcDataSources:
defaultDS:
url: jdbc:mysql://localhost:3306/test?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name这里设置实例名称
groups:
- groupId: g1 # 实例中分组名称
# 目标库(消费端)配置
outerAdapters:
- name: logger # name 可多个消费端配置
- name: rdb
key: mysql1 # 实例中的 key
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://localhost:3306/test2?useUnicode=true
jdbc.username: root
jdbc.password: root
实例配置文件:conf/rdb/user.yml
其中 user.yml
是将默认的mytest_user.yml
重命名
1️⃣实例启动时会自动加载该文件夹下所有的.yml配置,根据需要将默认的配置修改或删除
2️⃣如果多表同步可以在该文件夹下配置多个 *.yml
这里以同步 test.user
到test2.user
为例
dataSourceKey: defaultDS
# cannal的instance或者MQ的topic
destination: example # 这里就是总配置文件的实例名称
groupId: g1 # 实例中分组名称
outerAdapterKey: mysql1 # 实例中的 key
concurrent: true # 是否按主键hash并行同步,并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
dbMapping:
database: test # 数据源的database/shcema
table: user # 数据源表名
targetTable: user # 目标库的表(直接写表名)
targetPk: # 主键映射
id: id # 如果是复合主键可以换行映射多个
mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样
# targetColumns: # 字段映射(优先级高于mapAll), 格式如,目标表字段: 源表字段
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: "where c_time>={}" # 手动同步的条件语句
commitBatch: 3000 # 批量提交的大小
# # Mirror schema synchronize config
# dataSourceKey: defaultDS
# destination: example
# groupId: g1
# outerAdapterKey: mysql1
# concurrent: true
# dbMapping:
# mirrorDb: true
# database: test
1.一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
2.目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息
启动客户端
sh bin/startup.sh
window下双击 startup.bat
三、MySQL全量同步
执行以下命令调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。
命令格式
curl "hostip:port/etl/type/key/task" -X POST
示例:
curl http://127.0.0.1:8081/etl/rdb/mysql1/user.yml -X POST -d "params=2022-04-09 00:00:00"
附:遇到的问题
一、Windows下启动报错: Config dir not found
1,把startup.bat中classpath分号隔开的两个顺序调换:CLASSPATH=%CLASSPATH%;%conf_dir%\..\lib\*
2,确保不是中文文件夹 以及 文件夹命名不能存在空格(Program Files 放在这个文加下也有问题)
二、解决: canal PositionNotFoundException
参考文章:解决canal PositionNotFoundException – 简书 (jianshu.com),摘录如下:
情况一、当你第一次监听的mysql和第二次监听的mysql不一样的时候,就有可能报出这个
PositionNotFoundException
。情况二、mysql的
binlog
被删除,也有可能会报出PositionNotFoundException
。
问题如下:
1.canal
监听mysql
的时候会在conf/{destination}/meta.dat
文件中记录当前binlog
的名字、position
。
2.下次启动canal
的时候就从这个binlog
的position
开始读取数据。
3.meta.dat
记录的是上一次的binlog
信息,当你删除mysql
的binlog
或者监听到另外一台mysql
后,meta.dat
记录的信息就相当于过期信息,所以就会出现PositionNotFoundException
。单机环境解决方案:
找到conf/{destination}/meta.dat
,然后删除meta.dat
,重新启动canal
,这个时候canal
就会重新生成meta.dat
文件。集群环境解决方案:
https://www.jianshu.com/p/e33a918aad09
进入zookeeper
,可以选择删除/otter
整个节点,也可以选择删除/otter/canal/destinations/{destinations}
,目的就是把过期的meta
信息删除。删除成功后,重新启动canal
就可以了
三、同步数据出错:etl failed
1、Duplicate entry
xxx etl failed! ==>Duplicate entry 'test_name' for key 'name'
解决方法:
dbMapping
的主键设置问题,根据自己的业务需求可设置为 UNIQUE KEY
,如果是联合唯一索引可以设置多个
dbMapping:
# 源数据源的database/shcema
database: test
table: user
targetTable: user
targetPk:
name: name
2、Deadlock found
{"succeeded":false,"resultMessage":"导入RDB 数据:1671930 条","errorMessage":"user_logs etl failed! ==>Deadlock found when trying to get lock; try restarting transaction
解决方法:
1️⃣数据库隔离级别调整为RC
2️⃣考虑使用配置concurrent: false
参考:canal-adapter-1.1.5 手动etl报死锁问题 · Issue #4022 · alibaba/canal (github.com)
参考文章:
Home · alibaba/canal Wiki (github.com)
使用canal同步 (aliyun.com)
Canal详细入门实战(使用总结) – CZQ_Darren – 博客园 (cnblogs.com)