canal使用小结

摘要:
canal,基于java开发,伪装成一个slave,去监听获取增量的binlog日志文件,然后解析处理获得的相关数据,利用获得的数据,可以用其他不同用途,比如同步到es中做搜索相关。进入mysql解压后目录,新增data文件夹。

一、基本概念

mysql本身支持主从备份,原理就是主库master生成的binlog文件记录了所有的增删改操作语句,然后slave向master发送dump协议,master将binlog日志文件推送给从库slave解析执行,达到数据一致备份的目的。

canal,基于java开发,伪装成一个slave,去监听获取增量的binlog日志文件,然后解析处理获得的相关数据(过程中可以加入自由的加入一些额外的功能性代码需求),利用获得的数据,可以用其他不同用途,比如同步到es中做搜索相关。

二、canal基本配置使用

测试环境:windows、mysql 5.7.26、canal 1.1.3、Navicat for MySQL。

1、mysql安装和配置

1.1、下载安装解压忽略。进入mysql解压后目录,新增data文件夹。

1.2、新增my.ini文件,添加配置:

[client]
# 设置mysql客户端连接服务端时默认使用的端口
port=3311
[mysql]
default-character-set=utf8
[mysqld]
character-set-server=utf8
port=3311
# 默认存储引擎innoDB
default-storage-engine=INNODB
# Server Id.数据库服务器id,这个id用来在主从服务器中标记唯一mysql服务器
server-id=1
datadir=E:\soft\mysql2\data
bind-address=0.0.0.0
# 开启binlog日志
log-bin=mysql-bin
binlog_format = ROW

1.3、cmd进入并目录,启动/关闭 mysql:

//启动
net start mysql
//关闭
net stop mysql

1.4、连接mysql并设置密码

连接:mysql -uroot -p,初始密码为空,一直按enter即可进入mysql命令行。

进入后设置密码:

// 切换库
use mysql;
// 设置密码
update user set authentication_string=PASSWORD("123456") where user="root";
// 刷新生效
 flush privileges;

设置成功后,quit退出重进,输入密码123456。

1.5、新增个canal的访问账户

// 新增用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
// 授权
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
// 刷新
FLUSH PRIVILEGES;

2、canal安装配置

下载canal包(https://github.com/alibaba/canal/releases),解压本地目录。

2.1、目录结构

canal使用小结第1张

其中cfang是拷贝example,需要多个instance可继续拷贝,再修改每个instance中的配置文件。

2.1、配置canal.properties

canal使用小结第2张

port可自定义,用于canal对外服务接口。destinations配置instance列表(连接db)。

2.2、配置instance.properties

canal使用小结第3张

其中canal.instance.defaultDatabaseName可不配置,全库扫描。

2.3、启动

bin目录,点击startup.bat,查看/logs/canal/canal.log日志文件,出现以下则为开启成功:

canal使用小结第4张

2.4、canal数据格式:

Entry
    Header
        logfileName [binlog文件名]
        logfileOffset [binlog position]
        executeTime [发生的变更]
        schemaName
        tableName
        eventType [insert/update/delete类型]
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]
    storeValue  [byte数据,可展开,对应的类型为RowChange]   
RowChange
    isDdl       [是否是ddl变更操作,比如create table/drop table]
    sql     [具体的ddl sql]
    rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
        beforeColumns [Column类型的数组]
        afterColumns [Column类型的数组]     
Column
    index      
    sqlType     [jdbc type]
    name        [column name]
    isKey       [是否为主键]
    updated     [是否发生过变更]
    isNull      [值是否为null]
    value       [具体的内容,注意为文本]

2.5、java程序测试

pom导入:

		<dependency>
		    <groupId>com.alibaba.otter</groupId>
		    <artifactId>canal.client</artifactId>
		    <version>1.1.3</version>
		</dependency>

java测试:

package com.cfang.prebo;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
public class CanalTest {
	public static void main(String[] args) throws Exception {
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "cfang", "", "");
		connector.connect();
		connector.subscribe(".*\..*");
		connector.rollback();
		while (true) {
            Message message = connector.getWithoutAck(100);  // 获取指定数量的数据
            long batchId = message.getId();
            if (batchId == -1 || message.getEntries().isEmpty()) {
                Thread.sleep(1000);
                continue;
            }
           // System.out.println(message.getEntries());
            printEntries(message.getEntries());
            connector.ack(batchId);// 提交确认,消费成功,通知server删除数据
//            connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
        }
	}
	private static void printEntries(List<Entry> entries) throws Exception {
        for (Entry entry : entries) {
            if (entry.getEntryType() != EntryType.ROWDATA) {
                continue;
            }
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
            for (RowData rowData : rowChange.getRowDatasList()) {
                switch (rowChange.getEventType()) {
	                case INSERT:
	                	 System.out.println("INSERT ");
	                     printColumns(rowData.getAfterColumnsList());
	                     break;
	                case UPDATE:
	                    System.out.println("UPDATE ");
	                    printColumns(rowData.getAfterColumnsList());
	                    break;
	                case DELETE:
	                    System.out.println("DELETE ");
	                    printColumns(rowData.getBeforeColumnsList());
	                    break;
	                default:
	                    break;
	             }
            }
        }
    }
	private static void printColumns(List<Column> columns) {
        for(Column column : columns) {
        	System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

Navicat中进行相关操作的时候,可在控台看到输出,例如:

canal使用小结第5张

免责声明:文章转载自《canal使用小结》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇android 实时获取 麦克风 音量大小mybatis-plus 条件构造器 Wrapper下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Hive的安装与部署(MySQL作为元数据库)

Hive的安装与部署(MySQL作为元数据) (开始之前确保Hadoop环境已经启动,确保Linux下的MySQL已经安装好) 1.     安装Hive (1)下载安装包 可从apache上下载hive安装包:http://mirror.bit.edu.cn/apache/hive/ (2)压缩包解压:   (2)移动至指定目录:(这里是移动至/us...

MAC 忘记Homebrew安装的mysql密码

@ 目录 MAC 忘记Homebrew安装的mysql密码 一、依照百度其他的方法, 二、简单粗暴 1.直接在my.cnf 修改,跳过mysql密码验证 2.修改密码 3.恢复my.cnf 4.重启mysql 登录测试 MAC 忘记Homebrew安装的mysql密码 一、依照百度其他的方法, 先停掉MySQL服务,再用mysqld_saf...

spring-boot整合mybatis(web mysql logback配置)

pom.xml相关的配置说明。 配置文件看着比价多,在创建spring-boot项目的时候,自需要添加web,mysql,mybatis三个选项即可 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns...

Linux搭建lamp(Apache+PHP+Mysql环境)centos7.2版详细教程

一、 检查系统环境 1、确认centos版本 [root@localhost ~]# cat /etc/redhat-release CentOS Linux release 7.2.1511 (Core) 2、检查是否安装过apache rpm -qa | grep httpd 或者: apachectl -v 或者: httpd -v...

Zabbix 系统概述与部署

Zabbix是一个非常强大的监控系统,是企业级的软件,来监控IT基础设施的可用性和性能.它是一个能够快速搭建起来的开源的监控系统,Zabbix能监视各种网络参数,保证服务器系统的安全运营,并提供灵活的通知机制以让系统管理员快速定位解决存在的各种问题,Zabbix系统几乎可用于任何系统的监控过程,它可以运行在Linux,Solaris,HP-UX,AIX,F...

【MySQL】MySQL层级数据的递归遍历

层级的业务数据在系统中很常见,如组织机构、商品品类等。如果要获取层级数据的全路径,除了缓存起来,就是递归访问的方式了: 将层级数据缓存在redis中,用redis递归获取层级结构。此方法效率高。 在MySQL中做递归遍历,(Oracle有递归的语法支持,而MySQL并没有),需要自己写函数去递归。此方法效率低。 程序运行基于效率要求,一般使用Redis...