FlinkCDC 入 Doris
flinkcdc—doris
·
flinkcdc—doris 基本参数
通用配置项
Key | Default Value | Comment |
---|---|---|
fenodes | -- | Doris FE http 地址 |
table.identifier | -- | Doris 表名,如:db1.tbl1 |
username | -- | 访问 Doris 的用户名 |
password | -- | 访问 Doris 的密码 |
doris.request.retries | 3 | 向 Doris 发送请求的重试次数 |
doris.request.connect.timeout.ms | 30000 | 向 Doris 发送请求的连接超时时间 |
doris.request.read.timeout.ms | 30000 | 向 Doris 发送请求的读取超时时间 |
doris.request.query.timeout.s | 3600 | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size | Integer. MAX_VALUE | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 |
doris.batch.size | 1024 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的的额外时间开销。 |
doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 |
doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |
sink.batch.size | 10000 | 单次写 BE 的最大行数 |
sink.max-retries | 1 | 写 BE 失败之后的重试次数 |
sink.batch.interval | 10s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入 BE。 默认值为10秒,支持时间单位 ms、 s、 min、 h 和 d。设置为 0 表示关闭定期写入。 |
sink.properties.* | -- | Stream Load 的导入参数 例如: 'sink.properties.column_separator' = ', ' 定义列分隔符 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01 'sink.properties.format' = 'json' 'sink.properties.strip_outer_array' = 'true' JSON格式导入 |
sink.enable-delete | true | 是否启用删除。此选项需要 Doris 表开启批量删除功能(0.15+版本默认开启),只支持 Unique 模型。 |
sink.batch.bytes | 10485760 | 单次写 BE 的最大数据量,当每个 batch 中记录的数据量超过该阈值时,会将缓存数据写入 BE。默认值为 10MB |
Doris 基础表
CREATE TABLE table1(
id INT,
student VARCHAR(32),
phone VARCHAR(32)
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id)
BUCKETS 3
PROPERTIES("replication_num" = "1");
FlinkCDC-MYSQL
必须要开启mysql的binlog.
引入依赖包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hzxt</groupId>
<artifactId>flinkcdc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.6</flink.version>
<debezium.version>1.5.4.Final</debezium.version>
<geometry.version>2.2.0</geometry.version>
<java.version>8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>2.16.0</log4j.version>
<spotless.version>2.4.2</spotless.version>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<flink.forkCount>1</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>
<log4j.configuration>log4j2-test.properties</log4j.configuration>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
</project>
flinkcdcmysql.java
package flinkcdc;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import flinkcdc.util.MyDeserialization;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
* @Author: M.J
* @Date: 2022/4/12 0012 14:40
*/
public class FlinkCDCMysql {
public static void main(String[] args) throws Exception {
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
// 获取mysql数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("xxx.xxx.xxx.xxx")
.port(3306)
.databaseList("gmall_flink")
.tableList("gmall_flink.z_user_info")
.username("")
.password("")
.deserializer(new MyDeserialization()).startupOptions(StartupOptions.latest())
.build();
// 创建flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 将数据源的数据导入doris
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.addSink(
DorisSink.sink(
DorisOptions.builder()
.setFenodes("xxx.xxx.xxx.xxx:8030")
.setTableIdentifier("kafkaDatabase.z_user_info")
.setUsername("root")
.setPassword("").build()
));
// .addSink(
// DorisSink.sink(
// DorisReadOptions.builder().build(),
// DorisExecutionOptions.builder()
// .setBatchSize(3)
// .setBatchIntervalMs(0L)
// .setMaxRetries(3)
// .setStreamLoadProp(pro).build(),
// DorisOptions.builder()
// .setFenodes("xxx.xxx.xxx.xxx:8030")
// .setTableIdentifier("kafkaDatabase.z_user_info")
// .setUsername("root")
// .setPassword("").build()
// ));
env.execute("MySqlCdcPrint");
}
}
MyDeserialization.java
package flinkcdc.util;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* 自定义序列化数据
* @Author: M.J
* @Date: 2022/4/12 0012 16:12
*/
public class MyDeserialization implements DebeziumDeserializationSchema<String> {
/**
* 封装为
* {"id":41,"student":"Math23","phone":"132222123"}
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
JSONObject res = new JSONObject();
Struct value = (Struct)sourceRecord.value();
// 获取after数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if(after != null){
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
res.put(field.name(),afterValue);
}
}
//输出数据
collector.collect(res.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
FlinkCDC-MONGODB
必须开启mongodb的oplog
FlinkCDCMongodb.java
package flinkcdc;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import flinkcdc.util.MongodbDeserialization;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
/**
* @Author: M.J
* @Date: 2022/4/13 0013 14:25
*/
public class FlinkCDCMongodb {
public static void main(String[] args) throws Exception {
DebeziumSourceFunction<String> bigdata = MongoDBSource.<String>builder().hosts("xxx.xxx.xxx.xxx:27017")
.databaseList("xxx").collectionList("xxx.xxx")
.deserializer(new MongodbDeserialization()).build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = env.addSource(bigdata);
String[] fields = {"id", "student", "phone"};
LogicalType[] types = {new IntType(), new VarCharType(), new VarCharType()};
stringDataStreamSource.print();
// stringDataStreamSource.addSink(
// DorisSink.sink(
// fields,
// types,
// DorisOptions.builder()
// .setFenodes("xxx.xxx.xxx.xxx:8030")
// .setTableIdentifier("kafkaDatabase.z_user_info")
// .setUsername("root")
// .setPassword("").build()
// ));
// .addSink(
// DorisSink.sink(
// DorisReadOptions.builder().build(),
// DorisExecutionOptions.builder()
// .setBatchSize(3)
// .setBatchIntervalMs(0L)
// .setMaxRetries(3)
// .setStreamLoadProp(pro).build(),
// DorisOptions.builder()
// .setFenodes("xxx.xxx.xxx.xxx:8030")
// .setTableIdentifier("kafkaDatabase.z_user_info")
// .setUsername("root")
// .setPassword("").build()
// ));
env.execute("MONGODBCdcPrint");
}
}
MongodbDeserialization.java
package flinkcdc.util;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterType;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: M.J
* @Date: 2022/4/14 0014 9:30
*/
public class MongodbDeserialization implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L;
private transient JsonConverter jsonConverter;
private final Boolean includeSchema;
private Map<String, Object> customConverterConfigs;
public MongodbDeserialization() {
this(false);
}
public MongodbDeserialization(Boolean includeSchema) {
this.includeSchema = includeSchema;
}
public MongodbDeserialization(Boolean includeSchema, Map<String, Object> customConverterConfigs) {
this.includeSchema = includeSchema;
this.customConverterConfigs = customConverterConfigs;
}
@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
if (this.jsonConverter == null) {
this.initializeJsonConverter();
}
byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
String s = new String(bytes);
JSONObject jsonObject = JSONObject.parseObject(s);
String fullDocument = jsonObject.getString("fullDocument");
String s1 = fullDocument.replaceAll(" \\{\"\\$oid\": ", "").replaceAll("\\},", ",");
// 去掉_id
StringBuffer stringBuffer = new StringBuffer("{");
String[] split = s1.split(",");
for (int i = 1; i < split.length; i++) {
stringBuffer.append(split[i]);
if (i != split.length -1){
stringBuffer.append(",");
}
}
out.collect(stringBuffer.toString());
}
private void initializeJsonConverter() {
this.jsonConverter = new JsonConverter();
HashMap<String, Object> configs = new HashMap(2);
configs.put("converter.type", ConverterType.VALUE.getName());
configs.put("schemas.enable", this.includeSchema);
if (this.customConverterConfigs != null) {
configs.putAll(this.customConverterConfigs);
}
this.jsonConverter.configure(configs);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)