SpringBoot集成canal实现示例解析

目录
  • 什么是 Canal
  • MySQL 的 Binlog
    • Binlog 的分类
  • Canal 的工作原理 
    • MySQL 主从复制过程
    • Canal 的工作原理
  • Canal使用场景
    • Canal使用实战 
      • 检查Mysql binlog功能是否有开启
      • 创建具有作为 MySQL slave的MySQL 账号
      • 下载安装Canal服务端
      • 监听多个Mysql实例配置
      • Springboot集成Canal客户端
    • 总结

      什么是 Canal

              阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

              Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

      MySQL 的 Binlog

              MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。

              一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:

              其一:MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给Slaves来达到 Master-Slave 数据一致的目的。

              其二:自然就是数据恢复了,通过使用 MySQL Binlog 工具来使恢复数据。

              二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。

      Binlog 的分类

      MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配置 binlog_format= statement|mixed|row。三种格式的区别:

              1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间  但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

              优点:节省空间。

              缺点:有可能造成数据不一致。

              2)row:行级, binlog 会记录每次操作后每行记录的变化。

              优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。

              缺点:占用较大空间。

              3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement。

                   statement在某些情况下 会按照ROW 的方式进行处理譬如:

                    1:当函数中包含 UUID() 时;

                    2: 包含AUTO_INCREMENT 字段的表被更新时;

                    3: 执行 INSERT DELAYED 语句时;

                    4: 用 UDF 时;

              优点:节省空间,同时兼顾了一定的一致性。

              缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。
      综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。 

      Canal 的工作原理 

      MySQL 主从复制过程

              1)Master 主库将改变记录,写到二进制日志(Binary Log)中;

              2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷到它的中继日志(relay log);

              3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。 

      Canal 的工作原理

      原理相对比较简单:

      • canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave,向 mysql master 发送 dump 协议

      • mysql master 收到 dump 请求,开始推送 binary log 给 slave (也就是 canal)

      • canal 解析 binary log 对象 (原始为 byte 流)。

      Canal使用场景

      1)原始场景: 阿里 Otter 中间件的一部分 Otter 是阿里用于进行异地数据库之间的同步框架,Canal 是其中一部分。

       2)常见场景 1:更新缓存

      3)常见场景 2:抓取业务表的新增变化数据,用于制作实时统计。 

      Canal使用实战 

      检查Mysql binlog功能是否有开启

      mysql> show variables like 'log_bin';
      +---------------+-------+
      | Variable_name | Value |
      +---------------+-------+
      | log_bin       | ON    |
      +---------------+-------+
      1 row in set (0.10 sec)

       如果显示状态为OFF表示该功能未开启,开启binlog功能,修改 mysql 的配置文件my.ini,追加内容:

      log-bin=mysql-bin #binlog文件名
      binlog_format=ROW #选择row模式
      server_id=1 #mysql实例id,不能和canal的slaveId重复

      service mysql restart 重启 mysql。

      创建具有作为 MySQL slave的MySQL 账号

      CREATE USER canal IDENTIFIED BY 'canal';  
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
      -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
      FLUSH PRIVILEGES;
      

      下载安装Canal服务端

       https://github.com/alibaba/canal/releases

      • canal-adapter(canal-client)

              相当于canal的客户端,会从canal-server中获取数据(需要配置为tcp方式),然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。相较于canal-server自带的canal.serverMode,canal-adapter提供的下游数据接受更为广泛。

      • canal-admin

              为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

      • canal-deployer(canal-server)

              可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。接收到MySQL的binlog数据后可以通过配置canal.serverMode:tcp, kafka, RocketMQ, RabbitMQ连接方式发送到对应的下游。其中tcp方式可以自定义canal客户端进行接受数据,较为灵活。

      修改 instance.properties配置文件,

      #需要改成数据源mysql数据库的信息
      canal.instance.master.address=127.0.0.1:3306
      #需要改成自己的数据库创建的从库用户名与密码
      canal.instance.dbUsername=canal
      canal.instance.dbPassword=canal
      #需要改成同步的数据库表规则
      canal.instance.filter.regex=.*\\..*
      

      常见的匹配规则:

              所有表:.* or .\…

              canal 数据库下所有表: canal\…*

              canal数据库下的以canal打头的表:canal.canal.*

              canal 数据库下的一张表:canal.test1

              多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)

      监听多个Mysql实例配置

              如果需要监听多个Mysql实例,通过前面 canal 架构,我们可以知道,一个 canal 服务 中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的 配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直 接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

      运行Canal服务端 sh bin/startup.sh(win下是运行 startup.bat)

      Springboot集成Canal客户端

      创建canal-clint SpringBoot工程

       在canal-clint 模块中配置 pom.xml

      <dependency>
          <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.1.2</version>
      </dependency>

       创建单机版Canal客户端SimpleCanalClientExampleTest 

      package com.canal.clint.clint;
      
      /**
       * <p>
       * </p>
       * @since 2025-03-30 17:13
       */
      
      import java.net.InetSocketAddress;
      import java.util.List;
      
      import com.alibaba.fastjson.JSONObject;
      import com.alibaba.otter.canal.client.CanalConnector;
      import com.alibaba.otter.canal.client.CanalConnectors;
      import com.alibaba.otter.canal.protocol.CanalEntry;
      import com.alibaba.otter.canal.protocol.Message;
      import com.google.protobuf.ByteString;
      import com.google.protobuf.InvalidProtocolBufferException;
      
      public class SimpleCanalClientExampleTest {
          public static void main(String[] args) throws InvalidProtocolBufferException {
              // 1.获取 canal 连接对象
              CanalConnector canalConnector =
                  CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
              while (true) {
                  // 2.获取连接
                  canalConnector.connect();
                  //  3.指定要监控的数据库,此处指定了要监听的库,会覆盖instance.properties配置的数据库表规则
                  canalConnector.subscribe("intl.*");
                  // 4.获取 Message
                  Message message = canalConnector.get(100);
                  List<CanalEntry.Entry> entries = message.getEntries();
                  if (entries.size() <= 0) {
                      System.out.println("没有数据,休息一会");
                      try {
                          Thread.sleep(1000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  } else {
                      for (CanalEntry.Entry entry : entries) {
                          // TODO 获取表名
                          String tableName = entry.getHeader().getTableName();
                          // TODO Entry 类型
                          CanalEntry.EntryType entryType = entry.getEntryType();
                          // TODO 判断 entryType 是否为 ROWDATA
                          if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                              // TODO 序列化数据
                              ByteString storeValue = entry.getStoreValue();
                              // TODO 反序列化
                              CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                              // TODO 获取事件类型
                              CanalEntry.EventType eventType = rowChange.getEventType();
                              // TODO 获取具体的数据
                              List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                              // TODO 遍历并打印数据
                              for (CanalEntry.RowData rowData : rowDataList) {
                                  List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                  JSONObject beforeData = new JSONObject();
                                  for (CanalEntry.Column column : beforeColumnsList) {
                                      beforeData.put(column.getName(), column.getValue());
                                  }
                                  JSONObject afterData = new JSONObject();
                                  List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                  for (CanalEntry.Column column : afterColumnsList) {
                                      afterData.put(column.getName(), column.getValue());
                                  }
                                  System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:"
                                      + beforeData + ",After:" + afterData);
                              }
                          }
                      }
                  }
              }
          }
      }

       创建数据库user表

      CREATE TABLE `user` (
        `id` int(11) NOT NULL,
        `name` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
        `remark` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

      插入数据

      INSERT INTO `intl`.`user`(`id`, `name`, `remark`) VALUES (1, '哈喽', 'Canal测试');
      

      输出结果

      注意坑:

      • 如果是基于阿里云服务器安装的Canal,记得开放11111端口(Canal的默认端口号);
      • 如果客户端调用了connector.subscribe("intl.*")方法,指定要监听的库,会覆盖instance.properties配置的数据库表规则;
      • 如果Mysq binlog日志类型设置为mixed可能会导致connector.subscribe("intl.*")方法失效,进而监听整个Mysql实例。

      Canal的部署也是支持集群的,需要配合ZooKeeper进行集群管理。Canal还有一个Web管理界面。

      总结

      本文转自网络,如有侵权请联系客服删除。