https://github.com/zendesk/maxwell
组件下载地址:https://github.com/zendesk/maxwell/releases/download/v1.10.7/maxwell-1.10.7.tar.gz
给mysql授权(只针对于maxwell库的操作)
其中user01为数据库用户名 666666为数据库密码
GRANT ALL on maxwell.* to ‘user01’@’%’ identified by ‘666666’;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on . to ‘user01’@’%’;
执行maxwell命令行(注:maxwell默认是把监听的mysql的binlog日志发送到kafka的主题叫maxwell的topic上的)
具体的demo 如下:
bin/maxwell –user=’user01’ –password=’666666’ –host=’127.0.0.1’ –include_dbs=db1 –include_tables=table1,table2 –producer=kafka –kafka.bootstrap.servers=d1:9092,d2:9092,d3:9092 –kafka_topic test
注:–user是数据库用户名 –password数据库密码 –host表示安装mysql的服务器地址(可以和安装maxwell的服务器不在同一台) –include_dbs表示要筛选具体的数据库 –include_tables表示筛选具体库下的具体表 –kafka.bootstrap.servers表示kafka的Ip地址和端口号 –kafka_topic kafka表示kafka对应的topic
kafka的相关配置(注:d1,d2,d3为每台服务器的hostname,kafka里的配置文件端口号要和命令行里给的端口号一致)
1) 启动kafka命令行(这里以后台进程方式运行)
nohup bin/kafka-server-start.sh config/server.properties &
2) 创建kafka的topic为test主题
bin/kafka-topics.sh –zookeeper d1:2181,d2:2181,d3:2181 –create –topic test –partitions 20 –replication-factor 1
3) 启动消费者窗口
bin/kafka-console-consumer.sh –bootstrap-server d1:9092,d2:9092,d3:9092 –topic test
spark streaming结合kafka
具体可参考spark官方网站的说明http://spark.apache.org/docs/2.2.1/structured-streaming-kafka-integration.html
解析Insert
#sql insert 3条数据
mysql> insert into user_info(userid,name,age) values (1,’name1’,10),(2,’name2’,20),(3,’name3’,30);
#kafka-console-consumer结果
#userid=1的数据被过滤掉了
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”insert”,”ts”:1533857131,”xid”:10571,”xoffset”:0,”data”:{“userid”:2,”name”:”name2”,”age”:20}}
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”insert”,”ts”:1533857131,”xid”:10571,”commit”:true,”data”:{“userid”:3,”name”:”name3”,”age”:30}}
解析Delete
#sql delete
mysql> delete from user_info where userid=2;
#kafka-console-consumer结果
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”delete”,”ts”:1533857183,”xid”:10585,”commit”:true,”data”:{“userid”:2,”name”:”name2”,”age”:20}}
解析Update
#sql update
mysql> update user_info set name=’name3’,age=23 where userid=3;
#maxwell解析结果
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”update”,”ts”:1533857219,”xid”:10595,”commit”:true,”data”:{“userid”:3,”name”:”name3”,”age”:23},”old”:{“age”:30}}
maxwell健康状态
http://node2:8090/db_test_maxwell/healthcheck
Maxwell优缺点
优点
(1) 相比较canal,配置简单,开箱即用。
(2) 可自定义发送目的地(java 继承类,实现方法),数据处理灵活(js)。
(3) 自带多种监控。
缺点
(1) 需要在待同步的业务库上建schema_database库(默认maxwell),用于存放元数据,如binlog消费偏移量。但按maxwell的意思,这并不是缺点。
(2) 不支持HA。而canal可通过zookeeper实现canal server和canal client的HA,实现failover。
解析数据库binLog日志到Mysql数据库
binlog2sql是大众点评开源的一款用于解析binlog的工具
生成回滚SQL
github操作文档:https://github.com/danfengcao/binlog2sql
使用该工具的前提
binlog_format为ROW,且binlog_row_image为full或noblog,默认为full。
必须开启MySQL Server,理由有如下两点:
1> 它是基于BINLOG_DUMP协议来获取binlog内容
2> 需要读取server端information_schema.COLUMNS表,获取表结构的元信息,拼接成可视化的sql语句
该工具所需权限如下:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO
因为是伪装成slave来获取主的二进制事件,故无需对binlog有可读权限。
看了下源代码,它本身的核心代码比较少,主要是在pymysqlreplication的基础上进行了二次开发。
pymysqlreplication实现了MySQL复制协议,可捕捉不同类型的EVENT事件。
具体可参考:https://github.com/noplay/python-mysql-replication
个人感觉,直接解析文本格式的binlog,也未尝不是一个好办法。
理由如下:
1> binlog2sql强烈依赖于MySQL复制协议,如果复制协议发生改变,则该工具将不可用。
虽然,复制协议发生改变的可能性很小(一般都会保持向前兼容),但相对而言,自带的mysqlbinlog肯定更懂binlog,基于mysqlbinlog解析后的结果进行处理,
可完全屏蔽复制协议等底层细节。
2> 用python来解析文本格式的binlog,本身也不是件难事。
譬如,update语句在binlog中的对应的文本
在得到表结构的情况下,基本上可离线解析。