1、使用背景

最近组里想要在现有的环境下搭一套实时数仓,综合分析之后doris将会作为实时数仓落地的数据库,数据源有消息数据,还有业务库的数据。

2、数据源接入

消息数据都好说,无论是pulsar,还是kafka,flink官方都已经提供了现成的source接口,照着官方文档去配置一下就ok,但是由于dba这个神秘组织的存在,他们担心会开启bin-log会增大他们的数据库压力,无法给我们开通访问bin-log的权限,作为大头兵的我只能默默接受,因此无法使用flink-cdc的方式去监控pg数据库的bin-log获取变动数据,只能自己写代码,通过查询数据的operation_time获取最新的数据,自己生成kafka消息。
相关代码如下:

public class MysqlToKafka {

    public static void main(String[] args) throws Exception {
        StudentInfo studentInfo = null;
        Properties pro = new Properties();
        pro.put("bootstrap.servers", "hadoop102:9092");
        pro.put("acks", "all");
        pro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.put("retries", 3);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(pro);
        MysqlJDBC mysqlJDBC = new MysqlJDBC();
        ResultSet resultSet = null;
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String sql = null;
        mysqlJDBC.init();

        while(true){
            String timeStamp = sdf.format(new Date());
            sql = "select " +
                    "stu_id," +
                    "stu_name," +
                    "course," +
                    "stu_score," +
                    "stu_operation_time " +
                    "from student_info " +
                    "where timestampdiff(minute,stu_operation_time,\""+timeStamp+"\") <= \""+2+"\";";
            resultSet = mysqlJDBC.select(sql);

            while(resultSet.next()){
                studentInfo = new StudentInfo(
                        resultSet.getString("stu_id"),
                        resultSet.getString("stu_name"),
                        resultSet.getString("course"),
                        resultSet.getString("stu_score"),
                        resultSet.getString("stu_operation_time"),
                        ""
                );
                System.out.println(studentInfo);
                kafkaProducer.send(new ProducerRecord<>("MysqlToKafka","mysql", JSON.toJSONString(studentInfo)));
            }
            Thread.sleep(60*1000);
        }
    }
}

3、数据处理和写入

数据处理
上面自己接入数据的逻辑是每过一分钟就去表里查询最近两分钟变化的数据,所以一定会出现重复的数据。
下面需要对数据进行去重处理,去重操作有两个地方可以进行
第一个:通过flink中的状态变量进行精准去重
第二个:在设计doris表的时候设计好字段数据的修改方式为replace,具体的建表语句可以参考doris官方文档

数据写入
往doris写入数据有很多种方式,可以参考doris官方文档,我们开始考虑通过jdbc的方式将数写入到doris中,但是insert into 方式并不适合大量数据的长时间插入,所有只能采用stream load或者使用doris扩展出来的flink-connector-doris,由于这个不是flink官方提供的sink组件所以在maven中央仓库并不能找到相关依赖,按照doris官方介绍可以通过自己编译一个doris的sink,偶然间在这里发现可以通过这种方式添加依赖,然后去调用DorisSink方法去实现,按着他的介绍底层也是Stream load的方式实现的,最后竟然调试成功了。

好奇这个DorisDB企业版文档和官方文档是什么关系??有知道的可以说一下
按照这里的方式添加依赖就可以

<repositories>
    <repository>
        <id>dorisdb-maven-releases</id>
        <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-releases/</url>
    </repository>
    <repository>
        <id>dorisdb-maven-snapshots</id>
        <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-snapshots/</url>
    </repository>
</repositories>
<dependency>
    <groupId>com.dorisdb.connector</groupId>
    <artifactId>flink-connector-doris</artifactId>
    <version>1.0.32-SNAPSHOT</version>  <!-- for flink-1.11 ~ flink-1.12 -->
    <version>1.0.32_1.13-SNAPSHOT</version>  <!-- for flink-1.13 -->
</dependency>

不知道这一步什么意思,所以没有操作这一步,好像并不影响

com.dorisdb.table.connector.flink.DorisDynamicTableSinkFactory
加入到:
src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

相关代码如下:

public class SinkConnectorToDoris {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(3000);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("group.id", "test");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.poll.records", "10000");

        DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<>("MysqlToKafka", new SimpleStringSchema(), properties));
        dataStream
        		//.map(t->JSON.parseObject(t))(多此一举)
                .keyBy(t->t)
                //RichFlatMapFunction对消息进行去重
                .flatMap(new RichFlatMapFunction<String, String>() {
                    private transient ValueState<Boolean> isExist;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        ValueStateDescriptor<Boolean> vsd = new ValueStateDescriptor<>("isExist", Boolean.class);
                        isExist = getRuntimeContext().getState(vsd);
                    }

                    @Override
                    public void flatMap(String s, Collector<String> collector) throws Exception {
                        if(isExist.value() == null){
                            collector.collect(s);
                            isExist.update(true);
                        }
                    }
                })
                .addSink(
                        DorisSink.sink(
                            DorisSinkOptions.builder()
                                    .withProperty("jdbc-url", "jdbc:mysql://172.17.60.10:19030/doris_qa")
                                    .withProperty("load-url", "172.17.60.10:18030")
                                    .withProperty("username", "root")
                                    .withProperty("password", "")
                                    .withProperty("table-name", "student_info_gxd_test")
                                    .withProperty("database-name", "doris_qa")
                                    .withProperty("sink.properties.format", "json")
                                    .withProperty("sink.properties.strip_outer_array", "true")
                                    .withProperty("sink.buffer-flush.interval-ms","1000")
                            .build()
                    )
                ).setParallelism(1);
            env.execute();
    }
}

注意点:
addSink传入的数据流应该是String泛型的数据流,一开始将数据流进行了.map(t->JSON.parseObject(t)),怎么也插不进去数据,希望大家不要犯同样的错误。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐