Clickhouse在流式数据中的批量写入

使用场景

对于clickhouse有过使用经验的开发者应该知道,ck的写入,最优应该是批量的写入,但是对于流式场景来说,每批写入的数据量都是不可控制的,如kafka,每批拉取的消息数量是不定的,flink对于每条数据流的输出,写入ck的效率会十分缓慢,所以写了一个demo,去批量入库,生产环境使用还需要优化

实现思路

我的思路是,维护一个缓存队列当做一个缓冲区,当队列数据条数到达一定阈值,或者数据滞留时间超过一定时间,此时进行ck的批量提交。

代码实现

flink 写入clickhouse 的sink

package com.su.data.sink;

import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.util.ClickhouseTasK;
import com.su.data.util.ClickhouseUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


import java.io.Serializable;


/**
 * @ClassName:ClickhouseSink
 * @Author: sz
 * @Date: 2022/7/8 10:44
 * @Description:
 */

public class ClickhouseSink extends RichSinkFunction<RouteInfoPoJO> implements  Serializable {


    String sql;


    public ClickhouseSink(String sql) {
        this.sql = sql;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ClickhouseTasK.connection  = ClickhouseUtil.getConn();
    }

    @Override
    public void close() throws Exception {
        super.close();
        ClickhouseTasK.connection.close();
    }

    @Override
    public void invoke(RouteInfoPoJO routeInfoPoJO, Context context) throws Exception {
    	//流式数据写入缓存
        ClickhouseTasK.getInstance(sql).totalAdd(routeInfoPoJO);
    }
}

数据处理模块

package com.su.data.util;

import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.sink.ClickhouseSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName:ClickhouseTasK
 * @Author: sz
 * @Date: 2022/7/8 16:24
 * @Description:
 */

public class ClickhouseTasK {

    /**
     * 有界队列容量最大值 单批次提交ck数量
     * */
    private static final int MAXSIZE = 10000;

    /**
     * 提交有界队列
     * */
    private static Queue<RouteInfoPoJO> queue = new LinkedBlockingQueue<>(MAXSIZE);

    /**
     * 缓存无界队列 无界队列从数据流中获取数据,有界队列从无界队列中拉取数据,数据流不阻塞
     * */
    private static Queue<RouteInfoPoJO> TOTAL_QUEUE = new ConcurrentLinkedQueue();

    /**
     * 对提交队列设置锁
     * */
    private static ReentrantLock queenLock =null;

    /**
     * 单例实体
     * */
    private static volatile ClickhouseTasK clickhouseTasK = null;

    /**
     * 队列满了
     * */
    private static Condition FULL = null;

    /**
     * 队列没满
     * */
    private static Condition UN_FULL = null;
    /**
     * ck连接
     * */
    public static Connection connection = null;


    /**
     * 有界队列最大等待时长 超过时长自动提交 时间 3000毫秒
     * */
    private static final long MAX_WAIT_TIME = 3000;

    /**
     * 队列提交线程
     * */
    private  static Thread dataThread = null;

    /**
     * 从无界队列拉取到有界队列的线程
     * */
    private  static Thread moveThread = null;

    /**
     * 时间计数线程 时间一到自动提交
     * */
    private static Thread timeThread = null;

    /**
     * 记录上次提交时间毫秒值
     * */
    static  AtomicLong atomicLong = null;

    /**
     * 记录无界队列数据获取总量
     * */
    static AtomicLong count = null;

    /**
     * 静态类加载初始化
     * */
    static {
        count = new AtomicLong(0);
        //有界队列  10000条提交一次
        queenLock = new ReentrantLock();
        FULL = queenLock.newCondition();
        UN_FULL =  queenLock.newCondition();
    }

    /**
     * 单例初始化
     * */
    public static ClickhouseTasK getInstance(String sql) throws  InterruptedException {
        if( null == clickhouseTasK){
            synchronized (ClickhouseTasK.class){
                if(null == clickhouseTasK){
                    clickhouseTasK = new ClickhouseTasK(sql);
                }
            }
        }
        return  clickhouseTasK;
    }

    /**
     * 构造函数初始化时间开始值,以及线程
     * */
    public ClickhouseTasK(String sql) throws  InterruptedException {
        atomicLong  = new AtomicLong(System.currentTimeMillis());
        CountDownLatch countDownLatch = new CountDownLatch(2);
        //时间记录线程
        timeThread = new Thread(()->{
            while (true){
                queenLock.lock();
                try {
                //时间一到 自动提交
                if(System.currentTimeMillis() - atomicLong.get()  >= MAX_WAIT_TIME && queue.size() >0 ){
                        System.out.println("到时间自动提交:"+queue.size());
                        //数据提交
                        commitData(queue,connection,sql);
                        //记录本次提交的时间
                        atomicLong.set(System.currentTimeMillis());
                        // 数据提交后,提交队列空了  唤醒 阻塞于UN_FULL的线程
                        UN_FULL.signal();
                }
                } catch (SQLException e) {

                    System.out.println("中断异常:"+e);
                }finally {
                    queenLock.unlock();
                }
            }
        });
        timeThread.setName("timeThread");
        //数据拉去线程
        moveThread = new Thread(() ->{
            countDownLatch.countDown();
            while (true){
                if(!TOTAL_QUEUE.isEmpty()){
                    add(TOTAL_QUEUE.poll());
                }
            }
        });
        moveThread.setName("moveThread");
        dataThread = new Thread(() ->{
            System.out.println(Thread.currentThread().getName() + "启动!!!");
            countDownLatch.countDown();
            while (true){
                queenLock.lock();
                try {
                    //等待队列满了 当阻塞于FULL的线程被唤醒,说明队列满了
                    FULL.await();
                    //提交逻辑 清空队列
                    if(!queue.isEmpty()){
                        commitData(queue,connection,sql);
                    }

                    //重置提交时间
                    atomicLong.set(System.currentTimeMillis());
                    // 数据提交后,提交队列空了  唤醒 阻塞于UN_FULL的线程
                    UN_FULL.signal();
                } catch (SQLException | InterruptedException exception) {
                    System.out.println("中断异常:"+exception);
                }finally {
                    queenLock.unlock();
                }
            }
        });
        dataThread.setName("dataQueenThread");
        moveThread.start();
        dataThread.start();
        timeThread.start();
        //确保各线程启动完成后 构造函数线程初始化完成
        countDownLatch.await();
        System.out.println("初始化完成了");

    }
    public void commitData(Queue<RouteInfoPoJO> queue, Connection connection, String sql) throws SQLException {
        System.out.println("准备提交,当前数量为:"+queue.size());
        //批量提交
        try(PreparedStatement preparedStatement = connection.prepareStatement(sql)){
            long startTime = System.currentTimeMillis();
            while (!queue.isEmpty()){
                RouteInfoPoJO routeInfoPoJO = queue.poll();
                preparedStatement.setString(1, routeInfoPoJO.getDeal_date());
                preparedStatement.setString(2, routeInfoPoJO.getClose_date());
                preparedStatement.setString(3, routeInfoPoJO.getCard_no());
                preparedStatement.setString(4, routeInfoPoJO.getDeal_value());
                preparedStatement.setString(5, routeInfoPoJO.getDeal_type());
                preparedStatement.setString(6, routeInfoPoJO.getCompany_name());
                preparedStatement.setString(7, routeInfoPoJO.getCar_no());
                preparedStatement.setString(8, routeInfoPoJO.getStation());
                preparedStatement.setString(9, routeInfoPoJO.getConn_mark());
                preparedStatement.setString(10, routeInfoPoJO.getDeal_money());
                preparedStatement.setString(11, routeInfoPoJO.getEqu_no());
                preparedStatement.addBatch();
            }
            //ck没有事务,提交了就执行了
            int[] ints = preparedStatement.executeBatch();
            long endTime = System.currentTimeMillis();
            System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length);
            System.out.println("现有总量:"+count.get());
        }
        // todo 真实场景下,数据要确保不丢失,需要对异常数据进行处理,如日志记录后,进行数据日志采集 重复入库即可
        // todo 想要确保ck数据不重复,建立时选择replacing合并树,然后重复数据自动合并就好了
    }

    public void  add (RouteInfoPoJO routeInfoPoJO){
        //满足 量 提交条件
        queenLock.lock();
        try {
            //提交队列满了
            if (queue.size() >= MAXSIZE ){
                //唤醒阻塞于 FULL的线程
                FULL.signal();
                //阻塞 UN_FULL上的线程
                UN_FULL.await();
            }
            if(routeInfoPoJO !=null){
                //提交队列入队
                queue.offer(routeInfoPoJO);
            }
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        } finally {
            queenLock.unlock();
        }
    }

    /**
     * 无界缓存队列入队  如果真实场景,评估最大内存,设置最大消息条数,评估消费速度,避免oom
     * */
    public void totalAdd(RouteInfoPoJO routeInfoPoJO){
        TOTAL_QUEUE.add(routeInfoPoJO);
        count.incrementAndGet();
    }
}

RouteInfoPoJO 实体

package com.su.data.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * @ClassName:RouteInfoPoJO
 * @Author: sz
 * @Date: 2022/7/4 17:11
 * @Description:
 */

@AllArgsConstructor
@Data
public class RouteInfoPoJO {
    private String deal_date;
    private String close_date;
    private String card_no;
    private String deal_value;
    private String deal_type;
    private String company_name;
    private String car_no;
    private String station;
    private String conn_mark;
    private String deal_money;
    private String equ_no;
}

flink 从kafka 写入ck

package com.su.data;

import com.alibaba.fastjson.JSONObject;
import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.serializer.MyDeserialization;
import com.su.data.sink.ClickhouseSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * @ClassName:KakfaToCK
 * @Author: sz
 * @Date: 2022/7/8 10:35
 * @Description:
 */

public class KakfaToCK {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"ckcomsumer");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ck-node");
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxxx:9092");


        DataStreamSource<RouteInfoPoJO> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<RouteInfoPoJO>("szt",new MyDeserialization(),properties));

        String sql = "INSERT INTO sz.ods_szt_data(deal_date, close_date, card_no, deal_value, deal_type, company_name, car_no, station, conn_mark, deal_money, equ_no) values (?,?,?,?,?,?,?,?,?,?,?)";
        //dataStreamSource.print();
        dataStreamSource.addSink(new ClickhouseSink(sql));
        env.execute("KakfaToCK");

    }
}

结语

最后,本案例是在学习github上深圳通大数据的项目写的,如果有兴趣可以去看看。
https://gitee.com/geekyouth/SZT-bigdata
如果有写的不对的地方还请大家多多指教!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐