目的

使用FlinkSQL(1.13.6)纯SQL方式,通过FlinkCDC(2.1.1)获取MySQL的Binlog数据,以流的形式同步到Hive表中。小文件问题可使用FlinkSQL批处理定期执行表合并来解决。

步骤

  1. 启动MySQL的Binlog功能(略)

  2. FlinkCDC获取MySQL Binlog并写入Kafka表;

-- 读取MySQL源表
DROP TABLE IF EXISTS mysql_cdc;
CREATE TABLE mysql_cdc (
   `do_date`     STRING
  ,`do_min`      STRING
  ,`pv`          INT
  PRIMARY KEY(do_date) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'xxx.xxx.xxx.xxx',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxxxxx',
  'database-name' = 'test',
  'table-name' = 'result_total_pvuv_min_mysql',
  'scan.startup.mode'='initial',
  'debezium.snapshot.mode' = 'initial'
);

-- Sink表,将数据写入Kafka
DROP TABLE IF EXISTS kafka_mysql_cdc;
CREATE TABLE kafka_mysql_cdc (
   `do_date`     STRING
  ,`do_min`      STRING
  ,`pv`          INT
) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.group.id' = 'flinkTest',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'collector101:9092,collector102:9092',
 'format' = 'debezium-json'
);

INSERT INTO kafka_mysql_cdc SELECT * FROM mysql_cdc;
  1. FlinkSQL Kafka表解析Map
-- 另一种解析表,以纯Json格式解析Debezium—json
DROP TABLE IF EXISTS kafka_mysql_parser;
CREATE TABLE kafka_mysql_parser (
  `before` MAP<STRING, STRING>
  ,`after`  MAP<STRING, STRING>
  ,`op`     STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.group.id' = 'flinkTest',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'collector101:9092,collector102:9092',
 'format' = 'json'
);
  1. 将解析后的数据存入Hive
-- 创建Hive表
SET table.sql-dialect=hive;
DROP TABLE IF EXISTS kafka_hive_cdc;
CREATE TABLE kafka_hive_cdc (
  do_date     STRING,
  do_min      STRING,
  pv          STRING,
  op          STRING,
  process_time STRING
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'auto-compaction'='true',
  'compaction.file-size'='128MB'
);

-- 通过字典获取Kafka中数据
INSERT INTO kafka_hive_cdc
SELECT
 case when `before` is not null then `before`['do_date'] else `after`['do_date'] end as a
,case when `before` is not null then `before`['do_min'] else `after`['do_min'] end as a
,case when `before` is not null then `before`['pv'] else `after`['pv'] end as a
,`op`
,CAST(PROCTIME() AS STRING)
,DATE_FORMAT(PROCTIME(), 'yyyy-MM-dd')
FROM kafka_mysql_parser;
  1. 小文件合并,注意这步需要关闭Checkpoint,否则会IllegalArgumentException: Checkpoint is not supported for batch jobs.
-- 小文件合并;
SET execution.runtime-mode=batch;
INSERT OVERWRITE kafka_hive_cdc SELECT * FROM kafka_hive_cdc;
  1. 从Hive中读取最终数据
-- 从Hive表中取出最终版数据
SELECT
 `do_date`
,`do_min`
,`pv`
FROM (
SELECT
*
,ROW_NUMBER() OVER(PARTITION BY `do_date` ORDER BY `process_time` DESC, `op`) AS `rn`
FROM kafka_hive_cdc) a
WHERE `rn` = 1 AND `op` = 'c';
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐