Flink的CDC

对于Flink这种数据处理中间件,有一种需求是监测并捕获数据库的变化,然后将这些变化记录下来,写入消息中间件供其他的服务进行订阅和消费,这种操作的全称就是Change Data Capture 变更数据获取

CDC的种类按照获取的方式分为两类,分别为查询 和 Binlog获取

图片

那么Flink提供了 Flink-cdc-connectors组件,可以直接从MySQL PostgreSQL 等数据库直接获取全量数据和增量变更数据的source组件

https://github.com/ververica/flink-cdc-connectors

图片

那么我们的操作就是首先创建一个数据库

然后在对应的mysql config中添加配置,增加binlog的同步数据库

# binlog 格式

binlog_format=ROW

# 指定具体要同步的数据库

binlog_do_db=gmall_itdachang

server_id=1

而对于java项目中的使用

首先导入一个依赖

<dependency>

<groupId>com.alibaba.ververica</groupId>

<artifactId>flink-connector-mysql-cdc</artifactId>

<version>1.2.0</version>

</dependency>

然后编写对应的代码

首先是准备好对应的环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

然后创建Mysql的Source

Properties props = new Properties();

props.setProperty(“scan.startup.mode”,”initial”);

SourceFunction<String> sourceFunction = MySQLSource.<String>builder()

.hostname(“linux1”)

.port(3306)

.username(“root”)

.password(“123123”)

.databaseList(“test”)

///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据

//注意:指定的时候需要使用”db.table”的方式

.tableList(“test.t_user”)

.debeziumProperties(props)

.deserializer(new StringDebeziumDeserializationSchema())

.build();

这时候的输出是一个字符串格式的

如果希望进行序列化的格式变更,可以考虑自己实现

DebeziumDeserializationSchema接口

这就是CDC的基本使用

发表评论

邮箱地址不会被公开。 必填项已用*标注