用一个接地气的案例来介绍如何实时计算 UV 数据。大家都知道,在 ToC 的互联网公司,UV 是一个很重要的指标,对于老板、商务、运营的及时决策会产生很大的影响,笔者在电商公司,目前主要的工作就是计算 UV、销售等各类实时数据,体验就特别深刻, 因此就用一个简单demo 演示如何用 Flink SQL 消费 Kafka 中的 PV 数据,实时计算出 UV 指标后写入 Hbase。

Kafka 源数据解析输入标题

PV 数据来源于埋点数据经 FileBeat 上报清洗后,以 ProtoBuffer 格式写入下游 Kafka,消费时第一步要先反序列化 PB 格式的数据为 Flink 能识别的 Row 类型,因此也就需要自定义实现 DeserializationSchema 接口,具体如下代码, 这里只抽取计算用到的 PV 的 mid、事件时间 time_local,并从其解析得到 log_date 字段:

public class PageViewDeserializationSchema implements DeserializationSchema<Row> {


    public static final Logger LOG = LoggerFactory.getLogger(PageViewDeserializationSchema.class);
    protected SimpleDateFormat dayFormatter;


    private final RowTypeInfo rowTypeInfo;


    public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){
        dayFormatter = new SimpleDateFormat("yyyyMMdd", Locale.UK);
        this.rowTypeInfo = rowTypeInfo;
    }
    @Override
    public Row deserialize(byte[] message) throws IOException {
        Row row = new Row(rowTypeInfo.getArity());
        MobilePage mobilePage = null;
        try {
            mobilePage = MobilePage.parseFrom(message);
            String mid = mobilePage.getMid();
            row.setField(0, mid);
            Long timeLocal = mobilePage.getTimeLocal();
            String logDate = dayFormatter.format(timeLocal);
            row.setField(1, logDate);
            row.setField(2, timeLocal);
        }catch (Exception e){
            String mobilePageError = (mobilePage != null) ? mobilePage.toString() : "";
            LOG.error("error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e);
        }
        return null;
    }

 

编写 Flink Job 主程序输入标题

将 PV 数据解析为 Flink 的 Row 类型后,接下来就很简单了,编写主函数,写 SQL 就能统计 UV 指标了,代码如下:

public class RealtimeUV {


    public static void main(String[] args) throws Exception {
        //step1 从properties配置文件中解析出需要的Kakfa、Hbase配置信息、checkpoint参数信息
        Map<String, String> config = PropertiesUtil.loadConfFromFile(args[0]);
        String topic = config.get("source.kafka.topic");
        String groupId = config.get("source.group.id");
        String sourceBootStrapServers = config.get("source.bootstrap.servers");
        String hbaseTable = config.get("hbase.table.name");
        String hbaseZkQuorum = config.get("hbase.zk.quorum");
        String hbaseZkParent = config.get("hbase.zk.parent");
        int checkPointPeriod = Integer.parseInt(config.get("checkpoint.period"));
        int checkPointTimeout = Integer.parseInt(config.get("checkpoint.timeout"));


        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        //step2 设置Checkpoint相关参数,用于Failover容错
        sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class,
                ProtobufSerializer.class);
        sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false);
        sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE);
        sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout);
        sEnv.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        //step3 使用Blink planner、创建TableEnvironment,并且设置状态过期时间,避免Job OOM
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, environmentSettings);
        tEnv.getConfig().setIdleStateRetentionTime(Time.days(1), Time.days(2));


        Properties sourceProperties = new Properties();
        sourceProperties.setProperty("bootstrap.servers", sourceBootStrapServers);
        sourceProperties.setProperty("auto.commit.interval.ms", "3000");
        sourceProperties.setProperty("group.id", groupId);


        //step4 初始化KafkaTableSource的Schema信息,笔者这里使用register TableSource的方式将源表注册到Flink中,而没有用register DataStream方式,也是因为想熟悉一下如何注册KafkaTableSource到Flink中
        TableSchema schema = TableSchemaUtil.getAppPageViewTableSchema();
        Optional<String> proctimeAttribute = Optional.empty();
        List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.emptyList();
        Map<String, String> fieldMapping = new HashMap<>();
        List<String> columnNames = new ArrayList<>();
        RowTypeInfo rowTypeInfo = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
        columnNames.addAll(Arrays.asList(schema.getFieldNames()));
        columnNames.forEach(name -> fieldMapping.put(name, name));
        PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema(
                rowTypeInfo);
        Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
        Kafka011TableSource kafkaTableSource = new Kafka011TableSource(
                schema,
                proctimeAttribute,
                rowtimeAttributeDescriptors,
                Optional.of(fieldMapping),
                topic,
                sourceProperties,
                deserializationSchema,
                StartupMode.EARLIEST,
                specificOffsets);
        tEnv.registerTableSource("pageview", kafkaTableSource);


        //step5 初始化Hbase TableSchema、写入参数,并将其注册到Flink中
        HBaseTableSchema hBaseTableSchema = new HBaseTableSchema();
        hBaseTableSchema.setRowKey("log_date", String.class);
        hBaseTableSchema.addColumn("f", "UV", Long.class);
        HBaseOptions hBaseOptions = HBaseOptions.builder()
                .setTableName(hbaseTable)
                .setZkQuorum(hbaseZkQuorum)
                .setZkNodeParent(hbaseZkParent)
                .build();
        HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder()
                .setBufferFlushMaxRows(1000)
                .setBufferFlushIntervalMillis(1000)
                .build();
        HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions);
        tEnv.registerTableSink("uv_index", hBaseSink);


        //step6 实时计算当天UV指标sql, 这里使用最简单的group by agg,没有使用minibatch或窗口,在大数据量优化时最好使用后两种方式
        String uvQuery = "insert into uv_index "
                + "select log_date,\n"
                + "ROW(count(distinct mid) as UV)\n"
                + "from pageview\n"
                + "group by log_date";
        tEnv.sqlUpdate(uvQuery);
        //step7 执行Job
        sEnv.execute("UV Job");
    }
}

以上就是一个简单的使用 Flink SQL 统计 UV 的 case, 代码非常简单,只需要理清楚如何解析 Kafka 中数据,如何初始化 Table Schema,以及如何将表注册到 Flink中,即可使用 Flink SQL 完成各种复杂的实时数据统计类的业务需求,学习成本比API 的方式低很多。说明一下,笔者这个 demo 是基于目前业务场景而开发的,在生产环境中可以真实运行起来,可能不能拆箱即用,你需要结合自己的业务场景自定义相应的 kafka 数据解析类。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐