背景

日常开发中,经常会有一些场景需要进行实时去重,实现方式多种多样,各有利弊,关键在于如何结合自己的业务场景选择适合自己的方式,咱今主要讨论如何通过RoaringBitmap实现实时去重。先来模拟一个简单的业务场景

需求:实时计算每个账户累计消费金额,通过累计消费金额触发后续动作,务必保证结果准确
条件:

  • 数据源为kafka中的binlog日志
  • 账户id和订单id均为bitint类型,金额为int类型
  • 采集端发送到kafka中的binlog保证at-least-once
  • 结果存储在kv存储

流程梳理

根据上述信息,我们了解到了要做的事,以及做这件事需要注意的问题。整个流程大致如下:
简化处理流程

首先从kafka中获取数据,解析数据;需求是要统计每个账户的消费金额,所以根据账户id进行分组统计;由于采集端存在重复发送数据的可能,因此要判断订单是否已经计算过了,确保计算结果准确;在计算每笔新的消费时,需要知道历史消费了多少,所以需要对累加结果进行存储;将计算结果写入外部存储
大致的流程咱们梳理完了,接下来就是考虑具体怎么实现,主要讨论下如何判断订单id是否处理过。由于账户数和订单数会一直增加且基数庞大,如果使用List或者Set的方式存放订单id进行判断的方法会需要很大的内存空间;需求对结果准确性有严格要求,如果使用布隆过滤器对订单id进行判断的话会存在误判的可能,因为布隆过滤器特性是一定不存在或者可能存在;使用外部存储进行判断需要保证事务;那什么是既节省存储空间,又能保证精确判断,并且不需要进行额外的事务保证呢?一顿分析下来,RoaringBitmap似乎是个不错的选择,不仅满足上述条件,并且扩展了对64位整数的支持。确定实现方式了,咱就开始写!

代码开发

依赖

<dependency>
	<groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
</dependency><dependency>
    <groupId>org.roaringbitmap</groupId>
    <artifactId>RoaringBitmap</artifactId>
    <version>0.9.21</version>
</dependency>

代码

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fyb.bigdata.demo.bean.BinlogColumn;
import com.fyb.bigdata.demo.bean.BinlogTable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;

/**
 * @author fyb
 * @version 1.0
 * @createDate 2021-10-02-23:24
 * @description Calculate the cumulative consumption amount of each account in real time
 * @updateUser
 * @updateDate
 * @updateRemark
 */
public class RoaringBitmapTest {

    private static final Logger LOG = LoggerFactory.getLogger(RoaringBitmapTest.class);

    public static void main(String[] args) throws Exception {

        //Creates an execution environment that represents the context in which the program is currently executed
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);
        //set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //configuration kafka information
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "111.111.111.111:9092");//comma separated list of Kafka brokers
        properties.put("group.id", "RoaringBitmapTest");//consumer group

        //creates a new Kafka streaming source consumer
        DataStream<String> source = env.addSource(
                new FlinkKafkaConsumer<>("simulate-binlog", new SimpleStringSchema(), properties));

        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.days(2))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        //create StateDescriptor
        ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor(
                "Roaring64Bitmap",
                TypeInformation.of(new TypeHint<Roaring64Bitmap>() {
                }));
        ValueStateDescriptor<Integer> priceDescriptor = new ValueStateDescriptor<>(
                "AccountPrice",
                Integer.class
        );
        //set state time-to-live
        bitmapDescriptor.enableTimeToLive(ttlConfig);
        priceDescriptor.enableTimeToLive(ttlConfig);

        //parsing binlog data, group by user id
        ObjectMapper mapper = new ObjectMapper();
        source.map(new MapFunction<String, Tuple2<Long, BinlogTable>>() {
            @Override
            public Tuple2<Long, BinlogTable> map(String value) throws Exception {
                BinlogTable binlog = mapper.readValue(value, BinlogTable.class);
                List<BinlogColumn> columns = binlog.getColumns();
                Long uid = Long.valueOf(columns.get(0).getValue());
                return new Tuple2<>(uid, binlog);
            }
        })
                .keyBy(0)
                .process(new ProcessFunction<Tuple2<Long, BinlogTable>, Object>() {

                    private transient ValueState<Roaring64Bitmap> bitmapState;
                    private transient ValueState<Integer> priceState;

                    @Override
                    public void open(Configuration parameters) {
                        bitmapState = getRuntimeContext().getState(bitmapDescriptor);
                        priceState = getRuntimeContext().getState(priceDescriptor);
                    }

                    @Override
                    public void processElement(Tuple2<Long, BinlogTable> value, Context ctx, Collector<Object> out) throws Exception {
                        
                        Roaring64Bitmap bitmap = bitmapState.value();
                        if (bitmap == null) {
                            bitmap = new Roaring64Bitmap();
                        }
                        Integer price = priceState.value();
                        if (price == null) {
                            price = 0;
                        }
                        BinlogTable binlog = value.f1;
                        List<BinlogColumn> columns = binlog.getColumns();
                        Long orderId = Long.valueOf(columns.get(1).getValue());
                        Integer orderPrice = Integer.valueOf(columns.get(2).getValue());
                        if (!bitmap.contains(orderId)) {
                            bitmap.addLong(orderId);
                            //accumulation price
                            price += orderPrice;
                            //update state
                            priceState.update(price);
                            bitmapState.update(bitmap);
                        }
                        out.collect(new Tuple2<>(value.f0, price));
                    }
                })
                .print();//Simplified logic, print instead

        //Triggers the program execution
        env.execute();
    }
}

binlog的解析方法为了省事直接指定的索引,实际使用时根据自己的binlog模型进行解析,模拟几条数据测试下去重和计算功能,结果正确。该段代码重点是展示如何在状态中使用RoaringBitmap,示例中的去重方法由于先对用户进行了分组,再使用的RoaringBitmap,实际上是每个用户都针对订单创建了一个RoaringBitmap,试想一个用户如果只有一个订单,那这么操作不是适得其反了么。可以先对订单进行去重,然后再根据用户分组,但是本例使用的是KeydState,因此必须先进行keyby后才可以使用,这块提供两种思路,各有利弊,再实际使用时根据场景进行调整,也欢迎各位留言表达更好的方式。

方法一

对订单id进行hash取模,根据余数进行keyby,这样能保证在重启的时候时候同一个订单id还分配到指定算子中进行去重判断,避免了重启时同一订单id分配到不同算子导致的误判

方法二

通过OperatorState实现重启时的分配,该方法需要根据业务场景设计Snapshot和Restore的逻辑,因此比较麻烦

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐