maxwell+kafka+Spark Streaming构建MySQL Binlog日志采集实时处理方案

https://github.com/zendesk/maxwell
组件下载地址:https://github.com/zendesk/maxwell/releases/download/v1.10.7/maxwell-1.10.7.tar.gz
 给mysql授权(只针对于maxwell库的操作)
其中user01为数据库用户名 666666为数据库密码
GRANT ALL on maxwell.* to ‘user01’@’%’ identified by ‘666666’;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on . to ‘user01’@’%’;
执行maxwell命令行(注:maxwell默认是把监听的mysql的binlog日志发送到kafka的主题叫maxwell的topic上的)



具体的demo 如下:
bin/maxwell –user=’user01’ –password=’666666’ –host=’127.0.0.1’ –include_dbs=db1 –include_tables=table1,table2 –producer=kafka –kafka.bootstrap.servers=d1:9092,d2:9092,d3:9092 –kafka_topic test  
注:–user是数据库用户名 –password数据库密码 –host表示安装mysql的服务器地址(可以和安装maxwell的服务器不在同一台) –include_dbs表示要筛选具体的数据库 –include_tables表示筛选具体库下的具体表 –kafka.bootstrap.servers表示kafka的Ip地址和端口号 –kafka_topic kafka表示kafka对应的topic
kafka的相关配置(注:d1,d2,d3为每台服务器的hostname,kafka里的配置文件端口号要和命令行里给的端口号一致)
  1) 启动kafka命令行(这里以后台进程方式运行)
   nohup bin/kafka-server-start.sh config/server.properties &



  2)  创建kafka的topic为test主题
   bin/kafka-topics.sh –zookeeper d1:2181,d2:2181,d3:2181 –create –topic test –partitions 20 –replication-factor 1 



  3) 启动消费者窗口
   bin/kafka-console-consumer.sh –bootstrap-server d1:9092,d2:9092,d3:9092  –topic test
spark streaming结合kafka
具体可参考spark官方网站的说明http://spark.apache.org/docs/2.2.1/structured-streaming-kafka-integration.html

解析Insert
#sql insert 3条数据
mysql> insert into user_info(userid,name,age) values (1,’name1’,10),(2,’name2’,20),(3,’name3’,30);
#kafka-console-consumer结果
#userid=1的数据被过滤掉了
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”insert”,”ts”:1533857131,”xid”:10571,”xoffset”:0,”data”:{“userid”:2,”name”:”name2”,”age”:20}}
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”insert”,”ts”:1533857131,”xid”:10571,”commit”:true,”data”:{“userid”:3,”name”:”name3”,”age”:30}}
解析Delete
#sql delete
mysql> delete from user_info where userid=2;



#kafka-console-consumer结果
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”delete”,”ts”:1533857183,”xid”:10585,”commit”:true,”data”:{“userid”:2,”name”:”name2”,”age”:20}}
解析Update
#sql update
mysql> update user_info set name=’name3’,age=23 where userid=3;



#maxwell解析结果
{“database”:”test_maxwell”,”table”:”user_info”,”type”:”update”,”ts”:1533857219,”xid”:10595,”commit”:true,”data”:{“userid”:3,”name”:”name3”,”age”:23},”old”:{“age”:30}}
maxwell健康状态
http://node2:8090/db_test_maxwell/healthcheck
Maxwell优缺点
优点
(1) 相比较canal,配置简单,开箱即用。
(2) 可自定义发送目的地(java 继承类,实现方法),数据处理灵活(js)。
(3) 自带多种监控。
缺点
(1) 需要在待同步的业务库上建schema_database库(默认maxwell),用于存放元数据,如binlog消费偏移量。但按maxwell的意思,这并不是缺点。
(2) 不支持HA。而canal可通过zookeeper实现canal server和canal client的HA,实现failover。



解析数据库binLog日志到Mysql数据库



  • 基本原理:

  • 1.读取日志解析对应的数据表的Log操作,先正则每条日志,找到匹配到的原始日志

  • 2.根据判断表中的主键或者关键字段选择是否解析或者跳过当前的日志

  • 3.将匹配到包含关键字段的日志数据,获取对数据操作的方式insert、update

  • 4.然后再去获取每个字段的含有,再将要执行的SQL语句加到批处理



binlog2sql是大众点评开源的一款用于解析binlog的工具



  1. 提取SQL


  2. 生成回滚SQL
    github操作文档:https://github.com/danfengcao/binlog2sql
    使用该工具的前提




  3. binlog_format为ROW,且binlog_row_image为full或noblog,默认为full。




  4. 必须开启MySQL Server,理由有如下两点:



    1> 它是基于BINLOG_DUMP协议来获取binlog内容



    2> 需要读取server端information_schema.COLUMNS表,获取表结构的元信息,拼接成可视化的sql语句





该工具所需权限如下:



GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO
因为是伪装成slave来获取主的二进制事件,故无需对binlog有可读权限。




  1. 看了下源代码,它本身的核心代码比较少,主要是在pymysqlreplication的基础上进行了二次开发。



    pymysqlreplication实现了MySQL复制协议,可捕捉不同类型的EVENT事件。



    具体可参考:https://github.com/noplay/python-mysql-replication




  2. 个人感觉,直接解析文本格式的binlog,也未尝不是一个好办法。



    理由如下:



    1> binlog2sql强烈依赖于MySQL复制协议,如果复制协议发生改变,则该工具将不可用。



     虽然,复制协议发生改变的可能性很小(一般都会保持向前兼容),但相对而言,自带的mysqlbinlog肯定更懂binlog,基于mysqlbinlog解析后的结果进行处理,

    可完全屏蔽复制协议等底层细节。


    2> 用python来解析文本格式的binlog,本身也不是件难事。



      譬如,update语句在binlog中的对应的文本

    在得到表结构的情况下,基本上可离线解析。



Category storage