flinkcdc—doris 基本参数

通用配置项

KeyDefault ValueComment
fenodes--Doris FE http 地址
table.identifier--Doris 表名,如:db1.tbl1
username--访问 Doris 的用户名
password--访问 Doris 的密码
doris.request.retries3向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms30000向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms30000向 Doris 发送请求的读取超时时间
doris.request.query.timeout.s3600查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger. MAX_VALUE一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.batch.size1024一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size64异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.read.field--读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.filter.query--过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。
sink.batch.size10000单次写 BE 的最大行数
sink.max-retries1写 BE 失败之后的重试次数
sink.batch.interval10sflush 间隔时间,超过该时间后异步线程将 缓存中数据写入 BE。 默认值为10秒,支持时间单位 ms、 s、 min、 h 和 d。设置为 0 表示关闭定期写入。
sink.properties.*--Stream Load 的导入参数 例如: 'sink.properties.column_separator' = ', ' 定义列分隔符 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01 'sink.properties.format' = 'json' 'sink.properties.strip_outer_array' = 'true' JSON格式导入
sink.enable-deletetrue是否启用删除。此选项需要 Doris 表开启批量删除功能(0.15+版本默认开启),只支持 Unique 模型。
sink.batch.bytes10485760单次写 BE 的最大数据量,当每个 batch 中记录的数据量超过该阈值时,会将缓存数据写入 BE。默认值为 10MB

Doris 基础表

CREATE TABLE table1(
id INT,
student VARCHAR(32),
phone VARCHAR(32)
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) 
BUCKETS 3
PROPERTIES("replication_num" = "1");

FlinkCDC-MYSQL

必须要开启mysql的binlog.

引入依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
​
    <groupId>com.hzxt</groupId>
    <artifactId>flinkcdc</artifactId>
    <version>1.0-SNAPSHOT</version>
​
    <properties>
        <flink.version>1.13.6</flink.version>
        <debezium.version>1.5.4.Final</debezium.version>
        <geometry.version>2.2.0</geometry.version>
        <java.version>8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <slf4j.version>1.7.25</slf4j.version>
        <log4j.version>2.16.0</log4j.version>
        <spotless.version>2.4.2</spotless.version>
        <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
        <flink.forkCount>1</flink.forkCount>
        <flink.reuseForks>true</flink.reuseForks>
        <log4j.configuration>log4j2-test.properties</log4j.configuration>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
​
    <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <!-- add the dependency matching your database -->
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <!-- the dependency is available only for stable releases. -->
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <type>test-jar</type>
        </dependency>
​
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
            <scope>compile</scope>
        </dependency>
​
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>
​
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.13_2.12</artifactId>
            <version>1.0.3</version>
        </dependency>
​
​
    </dependencies>
</project>

flinkcdcmysql.java

package flinkcdc;
​
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import flinkcdc.util.MyDeserialization;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
​
import java.util.Properties;
​
/**
 * @Author: M.J
 * @Date: 2022/4/12 0012 14:40
 */
public class FlinkCDCMysql {
    public static void main(String[] args) throws Exception {
        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
​
      // 获取mysql数据源
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("xxx.xxx.xxx.xxx")
                .port(3306)
                .databaseList("gmall_flink")
                .tableList("gmall_flink.z_user_info")
                .username("")
                .password("")
                .deserializer(new MyDeserialization()).startupOptions(StartupOptions.latest())
                .build();
​
      // 创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // 将数据源的数据导入doris
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .addSink(
                        DorisSink.sink(
                                DorisOptions.builder()
                                .setFenodes("xxx.xxx.xxx.xxx:8030")
                                .setTableIdentifier("kafkaDatabase.z_user_info")
                                .setUsername("root")
                                .setPassword("").build()
                        ));
//                .addSink(
//                DorisSink.sink(
//                        DorisReadOptions.builder().build(),
//                        DorisExecutionOptions.builder()
//                                .setBatchSize(3)
//                                .setBatchIntervalMs(0L)
//                                .setMaxRetries(3)
//                                .setStreamLoadProp(pro).build(),
//                        DorisOptions.builder()
//                                .setFenodes("xxx.xxx.xxx.xxx:8030")
//                                .setTableIdentifier("kafkaDatabase.z_user_info")
//                                .setUsername("root")
//                                .setPassword("").build()
//                ));
​
        env.execute("MySqlCdcPrint");
    }
}
​

MyDeserialization.java

package flinkcdc.util;
​
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
​
import java.util.List;
​
/**
 * 自定义序列化数据
 * @Author: M.J
 * @Date: 2022/4/12 0012 16:12
 */
public class MyDeserialization implements DebeziumDeserializationSchema<String> {
    /**
     * 封装为
     * {"id":41,"student":"Math23","phone":"132222123"}
     */
​
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject res = new JSONObject();
​
        Struct value = (Struct)sourceRecord.value();
​
        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if(after != null){
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                res.put(field.name(),afterValue);
            }
        }
        //输出数据
        collector.collect(res.toString());
    }
​
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
​

FlinkCDC-MONGODB

必须开启mongodb的oplog

FlinkCDCMongodb.java

package flinkcdc;
​
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import flinkcdc.util.MongodbDeserialization;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
​
/**
 * @Author: M.J
 * @Date: 2022/4/13 0013 14:25
 */
public class FlinkCDCMongodb {
    public static void main(String[] args) throws Exception {
        DebeziumSourceFunction<String> bigdata = MongoDBSource.<String>builder().hosts("xxx.xxx.xxx.xxx:27017")
                .databaseList("xxx").collectionList("xxx.xxx")
                .deserializer(new MongodbDeserialization()).build();
​
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stringDataStreamSource = env.addSource(bigdata);
        String[] fields = {"id", "student", "phone"};
        LogicalType[] types = {new IntType(), new VarCharType(), new VarCharType()};
        stringDataStreamSource.print();
//        stringDataStreamSource.addSink(
//                DorisSink.sink(
//                        fields,
//                        types,
//                        DorisOptions.builder()
//                                .setFenodes("xxx.xxx.xxx.xxx:8030")
//                                .setTableIdentifier("kafkaDatabase.z_user_info")
//                                .setUsername("root")
//                                .setPassword("").build()
//                ));
​
//                .addSink(
//                DorisSink.sink(
//                        DorisReadOptions.builder().build(),
//                        DorisExecutionOptions.builder()
//                                .setBatchSize(3)
//                                .setBatchIntervalMs(0L)
//                                .setMaxRetries(3)
//                                .setStreamLoadProp(pro).build(),
//                        DorisOptions.builder()
//                                .setFenodes("xxx.xxx.xxx.xxx:8030")
//                                .setTableIdentifier("kafkaDatabase.z_user_info")
//                                .setUsername("root")
//                                .setPassword("").build()
//                ));
​
        env.execute("MONGODBCdcPrint");
​
    }
}
​

MongodbDeserialization.java

package flinkcdc.util;
​
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterType;
​
import java.util.HashMap;
import java.util.Map;
​
/**
 * @Author: M.J
 * @Date: 2022/4/14 0014 9:30
 */
public class MongodbDeserialization implements DebeziumDeserializationSchema<String> {
    private static final long serialVersionUID = 1L;
    private transient JsonConverter jsonConverter;
    private final Boolean includeSchema;
    private Map<String, Object> customConverterConfigs;
​
    public MongodbDeserialization() {
        this(false);
    }
​
    public MongodbDeserialization(Boolean includeSchema) {
        this.includeSchema = includeSchema;
    }
​
    public MongodbDeserialization(Boolean includeSchema, Map<String, Object> customConverterConfigs) {
        this.includeSchema = includeSchema;
        this.customConverterConfigs = customConverterConfigs;
    }
​
​
    @Override
    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
        if (this.jsonConverter == null) {
            this.initializeJsonConverter();
        }
​
        byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
        String s = new String(bytes);
        JSONObject jsonObject = JSONObject.parseObject(s);
        String fullDocument = jsonObject.getString("fullDocument");
        String s1 = fullDocument.replaceAll(" \\{\"\\$oid\": ", "").replaceAll("\\},", ",");
​
        // 去掉_id
        StringBuffer stringBuffer = new StringBuffer("{");
        String[] split = s1.split(",");
        for (int i = 1; i < split.length; i++) {
            stringBuffer.append(split[i]);
            if (i != split.length -1){
                stringBuffer.append(",");
            }
        }
​
        out.collect(stringBuffer.toString());
    }
​
    private void initializeJsonConverter() {
        this.jsonConverter = new JsonConverter();
        HashMap<String, Object> configs = new HashMap(2);
        configs.put("converter.type", ConverterType.VALUE.getName());
        configs.put("schemas.enable", this.includeSchema);
        if (this.customConverterConfigs != null) {
            configs.putAll(this.customConverterConfigs);
        }
​
        this.jsonConverter.configure(configs);
    }
​
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
​

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐