Flink的CDC
对于Flink这种数据处理中间件,有一种需求是监测并捕获数据库的变化,然后将这些变化记录下来,写入消息中间件供其他的服务进行订阅和消费,这种操作的全称就是Change Data Capture 变更数据获取
CDC的种类按照获取的方式分为两类,分别为查询 和 Binlog获取
那么Flink提供了 Flink-cdc-connectors组件,可以直接从MySQL PostgreSQL 等数据库直接获取全量数据和增量变更数据的source组件
https://github.com/ververica/flink-cdc-connectors
那么我们的操作就是首先创建一个数据库
然后在对应的mysql config中添加配置,增加binlog的同步数据库
# binlog 格式
binlog_format=ROW # 指定具体要同步的数据库 binlog_do_db=gmall_itdachang server_id=1 |
而对于java项目中的使用
首先导入一个依赖
<dependency>
<groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.2.0</version> </dependency> |
然后编写对应的代码
首先是准备好对应的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
然后创建Mysql的Source
Properties props = new Properties();
props.setProperty(“scan.startup.mode”,”initial”); SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname(“linux1”) .port(3306) .username(“root”) .password(“123123”) .databaseList(“test”) ///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据 //注意:指定的时候需要使用”db.table”的方式 .tableList(“test.t_user”) .debeziumProperties(props) .deserializer(new StringDebeziumDeserializationSchema()) .build(); |
这时候的输出是一个字符串格式的
如果希望进行序列化的格式变更,可以考虑自己实现
DebeziumDeserializationSchema接口
这就是CDC的基本使用