背景

业务中经常出现一些千万乃至亿级别的大表,此时可能考虑分库分表(Sharding-JDBC、MyCat等方案),也常同步数据进入ES中;同步数据这一业务场景中,Flink CDC是一个很不错的解决方案。

方案

如mysql、postgresql、sqlserver等,flink cdc通过读取binlog日志(注意:请先开启binlog日志),进行数据同步,实时性较好。

对数据的解析和消费进行了二次封装,使用者只需增加简单的配置,实现FlinkConsumerListener接口,关注编写业务代码即可。

代码

show coding

flink: flink cdc 暂时支持mysql

测试demo

创建一个springboot项目

依赖引入(引入上述工程打包后的依赖)

        <dependency>
            <groupId>com.kwin</groupId>
            <artifactId>flink</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

配置文件

flink:
  pipeline-name: flinkCDCTest
  mysqlDataSource:
        - port: 3306
          hostname: 127.0.0.1
          databaseList:
            - flinktest
          tableList:
            - flinktest.student
          username: root
          password: 123456

如上,针对flinktest数据库的student表进行binlog监听。

flinktest.student的消费者

student实体

import lombok.Data;

/**
 * @author kwin
 * @Date 2022/7/25 18:27
 **/
@Data
public class Student {
    private Long id;

    private String name;

    private Integer age;

    private Integer maxInx;
}

 消费者

import com.kwin.demo.server.module.flink.test.entity.Student;
import com.kwin.flink.sink.FlinkConsumerListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author kwin
 * @Date 2022/7/25 18:29
 **/
@Slf4j
@Component
public class StudentConsumerListener implements FlinkConsumerListener<Student> {
    @Override
    public String getDBName() {
        return "flinktest";
    }

    @Override
    public String getTable() {
        return "student";
    }

    @Override
    public void insert(Student data) {
        System.out.println("insert: " + data);
    }

    @Override
    public void update(Student srcData, Student destData) {
        System.out.println("update: \nsrc:" + srcData + "\ndest:" + destData);
    }

    @Override
    public void delete(Student data) {
        System.out.println("delete:"+data);
    }
}

启动项目

flinktest.student修改数据时:

flinktest.student插入数据时:

 flinktest.student删除数据时:

 如上,使用者只需实现FlinkConsumerListener接口,即可对指定表的数据进行消费和业务逻辑操作。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐