Canal实现MySQL到MySQL的全量、增量同步

一、简要说明

canal的使用需要一个服务端 deployer 和客户端 adapter 。简单来说,服务端来监听源数据库的bin-log变化并解析为 sql 等待客户端消费;客户端连接服务端来进行sql消费。

二、配置实现

参考官方文档:QuickStart · alibaba/canal Wiki (github.com),多看,多实践

1.准备工作

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 连接 MySQL 的账号具有作为 MySQL slave 的权限(SELECT, REPLICATION SLAVE, REPLICATION CLIENT)

2.配置服务端

下载服务端包:canal.deployer-1.1.5.tar.gz

进入文件 conf/example/instance.properties,如下修改,其他保持默认即可。

# position info 这里设置源数据库地址(可选增量信息)
canal.instance.master.address=localhost:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# ... 中间省略
# username/password 这里设置源数据库用户名、密码、编码格式
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8

启动服务端

sh bin/startup.sh
window下双击 startup.bat

3.配置客户端

下载安装包:canal.adapter-1.1.5.tar.gz

这里客户端启动后会提供REST接口,以供查看监控客户端运行状态

ClientAdapter · alibaba/canal Wiki (github.com)

总配置文件 conf/application.yml,根据需要配置(这里以MySQL 到 MySQL同步为例),其他保持默认即可

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:

  #... 中间省略
  
  # 数据源配置
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://localhost:3306/test?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: example # canal instance Name or mq topic name这里设置实例名称
    groups:
    - groupId: g1     # 实例中分组名称
      # 目标库(消费端)配置
      outerAdapters:
      - name: logger  # name 可多个消费端配置
      - name: rdb
        key: mysql1   # 实例中的 key
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://localhost:3306/test2?useUnicode=true
          jdbc.username: root
          jdbc.password: root

实例配置文件conf/rdb/user.yml 其中 user.yml 是将默认的mytest_user.yml重命名

1️⃣实例启动时会自动加载该文件夹下所有的.yml配置,根据需要将默认的配置修改或删除

2️⃣如果多表同步可以在该文件夹下配置多个 *.yml

这里以同步 test.usertest2.user 为例

dataSourceKey: defaultDS
# cannal的instance或者MQ的topic
destination: example    # 这里就是总配置文件的实例名称
groupId: g1             # 实例中分组名称
outerAdapterKey: mysql1 # 实例中的 key
concurrent: true        # 是否按主键hash并行同步,并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
dbMapping:
  database: test        # 数据源的database/shcema
  table: user           # 数据源表名
  targetTable: user     # 目标库的表(直接写表名)
  targetPk:             # 主键映射
    id: id              # 如果是复合主键可以换行映射多个
  mapAll: true          # 是否整表映射, 要求源表和目标表字段名一模一样
  # targetColumns:      # 字段映射(优先级高于mapAll), 格式如,目标表字段: 源表字段
  #   id:
  #   name:
  #   role_id:
  #   c_time:
  #   test1:
  etlCondition: "where c_time>={}"   # 手动同步的条件语句
  commitBatch: 3000                  # 批量提交的大小


# # Mirror schema synchronize config
# dataSourceKey: defaultDS
# destination: example
# groupId: g1
# outerAdapterKey: mysql1
# concurrent: true
# dbMapping:
#  mirrorDb: true
#  database: test

1.一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
2.目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息

启动客户端

sh bin/startup.sh
window下双击 startup.bat

三、MySQL全量同步

执行以下命令调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。

命令格式

curl "hostip:port/etl/type/key/task" -X POST

示例:

curl http://127.0.0.1:8081/etl/rdb/mysql1/user.yml -X POST -d "params=2022-04-09 00:00:00"

附:遇到的问题

一、Windows下启动报错: Config dir not found

1,把startup.bat中classpath分号隔开的两个顺序调换:CLASSPATH=%CLASSPATH%;%conf_dir%\..\lib\*

2,确保不是中文文件夹 以及 文件夹命名不能存在空格(Program Files 放在这个文加下也有问题)

二、解决: canal PositionNotFoundException

参考文章:解决canal PositionNotFoundException – 简书 (jianshu.com),摘录如下:

情况一、当你第一次监听的mysql和第二次监听的mysql不一样的时候,就有可能报出这个PositionNotFoundException

情况二、mysql的binlog被删除,也有可能会报出PositionNotFoundException
问题如下:
1.canal监听mysql的时候会在conf/{destination}/meta.dat文件中记录当前binlog的名字、position
2.下次启动canal的时候就从这个binlogposition开始读取数据。
3.meta.dat记录的是上一次的binlog信息,当你删除mysqlbinlog或者监听到另外一台mysql后,meta.dat记录的信息就相当于过期信息,所以就会出现PositionNotFoundException

单机环境解决方案:
找到conf/{destination}/meta.dat,然后删除meta.dat,重新启动canal,这个时候canal就会重新生成meta.dat文件。

集群环境解决方案:
进入zookeeper,可以选择删除/otter整个节点,也可以选择删除/otter/canal/destinations/{destinations},目的就是把过期的meta信息删除。删除成功后,重新启动canal就可以了

https://www.jianshu.com/p/e33a918aad09

三、同步数据出错:etl failed

1、Duplicate entry

xxx etl failed! ==>Duplicate entry 'test_name' for key 'name'

解决方法:

dbMapping的主键设置问题,根据自己的业务需求可设置为 UNIQUE KEY,如果是联合唯一索引可以设置多个

dbMapping:
  # 源数据源的database/shcema
  database: test
  table: user
  targetTable: user
  targetPk:
    name: name

2、Deadlock found

{"succeeded":false,"resultMessage":"导入RDB 数据:1671930 条","errorMessage":"user_logs etl failed! ==>Deadlock found when trying to get lock; try restarting transaction

解决方法:

1️⃣数据库隔离级别调整为RC
2️⃣考虑使用配置concurrent: false

参考:canal-adapter-1.1.5 手动etl报死锁问题 · Issue #4022 · alibaba/canal (github.com)

参考文章:
Home · alibaba/canal Wiki (github.com)
使用canal同步 (aliyun.com)
Canal详细入门实战(使用总结) – CZQ_Darren – 博客园 (cnblogs.com)

Author: thinkwei

1 thought on “Canal实现MySQL到MySQL的全量、增量同步

  1. 我在用canal-adapter同步几千条数据时,只同步成功了一条数据。日志也没报错说道:

    我在用canal-adapter全量同步几千条数据时,只同步成功了一条数据。日志也没报错

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注