有一个这样的场景,有大批量数据在mongo中,其中mongo中数据还会频繁变更,但是几乎不会删除,这样的数据需要产生一个大屏来供用户看统计数据,之前的设计方案是直接通过定时任务每10分钟去mongo计算一次,这种做法除了计算慢还可能会导致影响业务,所以需要通过大数据计算增加大屏的实时性,且降低对业务的影响。我提供如下解决方案:

mongo中数据通过cdc同步到kafka,然后kafka中数据再分别落到hudi和clickhouse中,如果对实时性要求比较低的话去掉kafka,直接用hudi也可以,flink可以流式读取hudi,后续如果clickhouse中数据又问题也可以直接从hudi读取不用再去mongo拉全量数据来初始化了。

为了测试方案的可行性,这里拿mongo通过flink-cdc直接同步到clickhouse做demo测试。

mongo和clickhouse使用dockerhub拉取,这里记录一些过程中用到的命令

docker pull mongo:5.0.11
docker pull clickhouse/clickhouse-server:21.7.11.3

写docker-compose文件

version: '3'
services:
  mongo5:
    image: mongo:5.0.11
    container_name: mongo5
    hostname: mongo5
    volumes:
      - mongo5-db:/data/db
      - mongo5-configdb:/data/configdb
      - ./mongod.conf:/etc/mongod.conf
    restart: always
    ports:
      - "27017:27017"
    networks:
      test:
        ipv4_address: 10.10.0.8
    command: ['mongod', '--config', '/etc/mongod.conf']

  clickhouse21:
    image: clickhouse/clickhouse-server:21.7.11.3
    container_name: clickhouse-server21
    hostname: clickhouse-server21
    volumes:
      - clickhouse-server21:/var/lib/clickhouse
    restart: always
    ports:
      - "8123:8123"
    networks:
      test:
        ipv4_address: 10.10.0.9

volumes:
  mongo5-db:
  mongo5-configdb:
  clickhouse-server21:

networks:
  test:
    external: true

network: test是我创建的外部网络,上边的ipv4_address是我自己test网断中的ip,这里可以更改为自己的网络

mongod.conf中内容如下,因为是demo,所以就不需要起多个mongod进程了,直接用一个mongdod进程

replication:
  replSetName: "rs0"

通过docker-compose up -d启动容器以后,分别进入mongo和clickhouse中

# 进入mongo容器内部
docker exec -it mongo5 bash
# 进入mongo shell命令行
mongosh
# 初始化rs
rs.initiate()
# 创建cdc同步用户
use admin;
db.createUser({
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    { role: "read", db: "admin" }, //read role includes changeStream privilege
    { role: "readAnyDatabase", db: "admin" } //for snapshot reading
  ]
});
# 创建同步用户的collection
use test;
db.createCollection('users');
# 进入clickhouser容器
docker exec -it clickhouse-server21 bash

# 通过default用户登陆clickhouse,安装后默认为无密码的default用户
clickhouse-client --user default

# 创建test库
create database test;

# 创建users表
create table users(
  id UInt32,
  name String,
  age Int8,
  create_time String,
  modify_time String,
  _mofify_time DateTime MATERIALIZED toDateTime(modify_time, 'Asia/Shanghai')
) engine = ReplacingMergeTree(_mofify_time)
partition by (toDate(create_time, 'Asia/Shanghai'))
order by id;

# 其中_mofify_time是用来给ReplacingMergeTree做合并使用的,根据modify_time保留最后更新的数据。

准备工作已完成,剩下就是通过jdbc往clickhouse写数据了,flink官方的jdbc-connector并不支持clickhouse,这里简单实现一个,需要修改一个flink的类,同时新增一个Clickhouse方言,具体操作如下:

新增ClickhouseDialect类,这个类目前仅供demo使用,getRowConverter和unSupportedTypes未适配。其中核心是修改update为insert语句,clickhouse中update和delete时mutaion操作,不建议修改数据,但是业务中数据改变又比较频繁,所以这里通过insert新数据来实现update,最终通过视图来过滤数据。

package org.apache.flink.connector.jdbc.dialect;

import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

public class ClickhouseDialect extends AbstractDialect {

    @Override
    public int maxDecimalPrecision() {
        return 76;
    }

    @Override
    public int minDecimalPrecision() {
        return 1;
    }

    @Override
    public int maxTimestampPrecision() {
        return 9;
    }

    @Override
    public int minTimestampPrecision() {
        return 0;
    }

    @Override
    public List<LogicalTypeRoot> unsupportedTypes() {
        return Collections.emptyList();
    }

    @Override
    public String dialectName() {
        return "Clickhouse";
    }

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:ch:") || url.startsWith("jdbc:clickhouse:");
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return new AbstractJdbcRowConverter(rowType) {
            @Override
            public String converterName() {
                return "Clickhouse";
            }
        };
    }

    @Override
    public String getLimitClause(long limit) {
        return "LIMIT " + limit;
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
    }

    @Override
    public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
        return super.getInsertIntoStatement(tableName, fieldNames);
    }

    @Override
    public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return Optional.of(getInsertIntoStatement(tableName, fieldNames));
    }
}

修改JdbcDialects类,在DIALECTS中添加ClickhouseDialect

package org.apache.flink.connector.jdbc.dialect;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class JdbcDialects {

    private static final List<JdbcDialect> DIALECTS =
            Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect(), new ClickhouseDialect());

    /** Fetch the JdbcDialect class corresponding to a given database url. */
    public static Optional<JdbcDialect> get(String url) {
        for (JdbcDialect dialect : DIALECTS) {
            if (dialect.canHandle(url)) {
                return Optional.of(dialect);
            }
        }
        return Optional.empty();
    }
}

编写flink程序并启动

package com.catcher92.demo.flink.cdc;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMongoCdcToClickhouse {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.getCheckpointConfig().setCheckpointInterval(10 * 1000L);
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/checkpoints/FlinkMongoCdcToClickhouse");
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

        final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
        builder.useBlinkPlanner().inStreamingMode();
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, builder.build());

        tEnv.executeSql("CREATE TABLE mongo_test_users (\n" +
                "  _id INT, \n" +
                "  name STRING,\n" +
                "  age INT,\n" +
                "  create_time STRING,\n" +
                "  modify_time STRING,\n" +
                "  PRIMARY KEY(_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'mongodb-cdc',\n" +
                "  'hosts' = 'localhost:27017',\n" +
                "  'username' = 'flinkuser',\n" +
                "  'password' = 'flinkpw',\n" +
                "  'database' = 'test',\n" +
                "  'collection' = 'users'\n" +
                ")");
        tEnv.executeSql("create table clickhouse_test_users(\n" +
                "  id INT, \n" +
                "  name STRING,\n" +
                "  age INT,\n" +
                "  create_time STRING,\n" +
                "  modify_time STRING,\n" +
                "  PRIMARY KEY(id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:ch://localhost:8123/test',\n" +
                "  'username' = 'default',\n" +
                "  'password' = '',\n" +
                "  'table-name' = 'users'\n" +
                ")");
        try {
            tEnv.executeSql("insert into clickhouse_test_users\n" +
                    "select * from mongo_test_users").await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

启动以后通过mongsh插入数据

db.getCollection('users').insertOne({'_id':1, 'name':'zs1', 'age':1, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
db.getCollection('users').insertOne({'_id':2, 'name':'zs2', 'age':2, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
db.getCollection('users').insertOne({'_id':3, 'name':'zs3', 'age':3, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
db.getCollection('users').insertOne({'_id':4, 'name':'zs4', 'age':4, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
db.getCollection('users').insertOne({'_id':5, 'name':'zs5', 'age':5, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});

由于mongo-cdc中靠算子ChangelogNormalize来补全-U的数据,所以会导致checkpoint特别大,这里需要用RocksDB做状态后端并且建议用增量checkpoint防止程序超内存被kill掉。

然后去clickhouse中查询数据

select * from users;

┌─id─┬─name─┬─age─┬─create_time─────────┬─modify_time─────────┐
│  1 │ zs1  │   1 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  2 │ zs2  │   2 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  3 │ zs3  │   3 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  4 │ zs4  │   4 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  5 │ zs5  │   5 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
└────┴──────┴─────┴─────────────────────┴─────────────────────┘

5 rows in set. Elapsed: 0.004 sec.

这个时候去更新mongo中数据

db.getCollection('users').updateOne({'_id':1}, {$set:{'name':'zs1_1', 'modify_time':'2022-09-04 16:47:00'}})

再去查询clickhouse中数据

select * from users;

Query id: b406561e-1e6d-4bfe-a3ae-e852516b11e0

┌─id─┬─name─┬─age─┬─create_time─────────┬─modify_time─────────┐
│  1 │ zs1  │   1 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  2 │ zs2  │   2 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  3 │ zs3  │   3 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  4 │ zs4  │   4 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
│  5 │ zs5  │   5 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
└────┴──────┴─────┴─────────────────────┴─────────────────────┘
┌─id─┬─name──┬─age─┬─create_time─────────┬─modify_time─────────┐
│  1 │ zs1_1 │   1 │ 2022-09-04 15:39:00 │ 2022-09-04 16:47:00 │
└────┴───────┴─────┴─────────────────────┴─────────────────────┘

6 rows in set. Elapsed: 0.005 sec.

可以看到已经变成6条数据了符合预期结果,至于clickhouse中ReplacingMergeTree啥时候会将数据合并成5条,完全是未知数,所以自行通过创建视图方式来过滤数据即可。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐