Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成

一、环境准备

1.1 软件版本

Flink 1.14.4

Scala 2.11

CDH 6.1.0

Hadoop 3.0.0

Hive 2.1.1

Hudi 0.11.0

Flink CDC 2.2.0

Mysql 5.7

1.2 Flink 准备

  1. 下载flink 1.14.4 到$HUDI_HOME
wget https://archive.apache.org/dist/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
  1. 解压
tar zxvf flink-1.14.4-bin-scala_2.11.tgz
  1. 下载flink-sql-connector
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar

1.3 Hadoop 准备

  1. 设置Hadoop环境
export HADOOP_CONF_DIR=/etc/hadoop/conf

1.4 Hudi 准备

  1. 下载Hudi 0.11.0 到$HUDI_HOME
wget --no-check-certificate https://dlcdn.apache.org/hudi/0.11.0/hudi-0.11.0.src.tgz
  1. 解压
tar zxvf hudi-0.11.0.src.tgz
  1. 完成后进入 packaging/hudi-flink-bundle 目录,执行命令:
mvn clean install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
  1. 将packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle_2.11-0.11.0.jar 拷贝到$HUDI_HOME/flink-1.14.4/lib/

1.5 Hive 准备

  1. 在 Hive 的根目录下创建 auxlib 文件夹
  2. 进入packaging/hudi-hadoop-mr-bundle 目录,执行命令:
    mvn clean install -DskipTests
  3. 进入packaging/hudi-hive-sync-bundle 目录,执行命令:
    mvn clean install -DskipTests
  4. 将上面两个打包好的jar包拷贝到 auxlib目录
hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.10.1.jar
hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.10.1.jar

1.6 注意

修改hudi-flink-bundle中的pom.xml文件的Hive版本为集群对应的版本

<properties>
  <hive.version>2.1.1-cdh6.1.0</hive.version>
  <flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties>

<!--编译报错,在repository中加入-->
<repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

二、kafka + flink + hudi + hive

2.1 启动Flink SQL

bin/yarn-session.sh -nm kafka2hudi -d -qu root.analysis -jm 2048 -tm 4096
bin/sql-client.sh embedded
SET execution.checkpointing.interval = 60000;

2.2 创建一个 kafka 的 source 和 hudi sink,启动 sql 流任务:

CREATE TABLE user_report_topic(
    uid string,
    userIp string,
    countryName string,
    countryCode string,
    regionName string,
    cityName string,
    ispName string,
    cVersion string,
    deviceId string,
    deviceType string,
    appType string,
    flagLevel Array<string>,
    visitType int,
    visitTime TIMESTAMP(3),
    WATERMARK FOR visitTime AS visitTime - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_report_topic',
      'properties.group.id' = 'user_report_topic_group2',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092',
      'format' = 'json'
    );

create table user_report_hudi(
  uid string,
  userIp string,
  countryName string,
  countryCode string,
  regionName string,
  cityName string,
  ispName string,
  cVersion string,
  deviceId string,
  deviceType string,
  appType string,
  PRIMARY KEY(uid) NOT ENFORCED
)
with (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi/data/user_report_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.bucket_assign.tasks' = '1',
  'write.tasks' = '1',
  'hive_sync.enable'= 'true',-- 开启自动同步hive
  'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式
  'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址
  'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',-- required, hiveServer地址
  'hive_sync.table'= 'user_report_hudi',-- hive 新建表名
  'hive_sync.db'= 'test',-- hive 新建数据库
  'hive_sync.username'= 'admin',-- HMS 用户名
  'hive_sync.password'= 'admin',-- HMS 密码
  'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);

insert into user_report_hudi select 
  uid ,
  userIp ,
  countryName ,
  countryCode ,
  regionName ,
  cityName ,
  ispName ,
  cVersion ,
  deviceId ,
  deviceType ,
  appType
from user_report_topic;

通过 Flink UI 可以查看作业运行状态。

2.3 Hive 查询

  1. MOR 生成两个表,COW 只生成一个表
--MOR rt表会比ro表多查询未合并的log数据,(合并的策略可以根据commits数量or时间调整,默认compaction.delta_commits=5)
test.user_report_hudi_ro --查询parquet
test.user_report_hudi_rt --查询parquet 和 log
--COW
test.user_report_hudi
  1. 查询 _rt表报错
    • 日志如下
Caused by: java.lang.IllegalArgumentException: HoodieRealtimeRecordReader can only work on RealtimeSplit and not with hdfs://nameservice1/hudi/data/user_report_hudi/7445853b-1f0d-4d34-9638-74ffe7e99664_0-4-0_20220808134127564.parquet:0+452025
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
	at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:310)
	at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.<init>(CombineHiveRecordReader.java:68)
	... 16 more
  • 解决方法
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

三、flink cdc + kafka + flink + hudi + hive

3.1 MySQL

  1. 数据准备
USE test;
CREATE TABLE test.products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE test.products AUTO_INCREMENT = 101;

INSERT INTO test.products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE test.orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;

INSERT INTO test.orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

3.2 启动Flink SQL

bin/yarn-session.sh -nm mysql2hudi -d -qu root.analysis -jm 2048 -tm 4096
bin/sql-client.sh embedded
SET execution.checkpointing.interval = 60000;

3.3 Flink CDC

Mysql数据库中的表 products, orders 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'xx.xx.xx.73',
    'port' = '3306',
    'username' = 'root',
    'password' = '123',
    'database-name' = 'test',
    'table-name' = 'products'
  );

CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mysql-cdc',
    'hostname' = 'xx.xx.xx.73',
    'port' = '3306',
    'username' = 'root',
    'password' = '123',
    'database-name' = 'test',
   'table-name' = 'orders'
 );

3.4 Kafka

  1. 创建kafka表
 CREATE TABLE products_cdc2kafka(
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'products_cdc2kafka_topic',
      'properties.group.id' = 'products_cdc2kafka_group',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092',
      'format' = 'debezium-json'
    );

 CREATE TABLE orders_cdc2kafka(
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'orders_cdc2kafka_topic',
      'properties.group.id' = 'orders_cdc2kafka_group',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092',
      'format' = 'debezium-json'
    );
  1. 将数据写入kafka
insert into products_cdc2kafka
select * from products;

insert into orders_cdc2kafka
select * from orders;

3.5 Hudi

  1. 创建hudi表 MOR
create table products_2hudi (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi/data/products_2hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.bucket_assign.tasks' = '1',
  'write.tasks' = '1',
  'hive_sync.enable'= 'true',-- 开启自动同步hive
  'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式
  'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址
  'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',     -- required, hiveServer地址
  'hive_sync.table'= 'products_2hudi',-- hive 新建表名
  'hive_sync.db'= 'test',-- hive 新建数据库
  'hive_sync.username'= 'admin',-- HMS 用户名
  'hive_sync.password'= 'admin',-- HMS 密码
  'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);

create table orders_2hudi (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi/data/orders_2hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.bucket_assign.tasks' = '1',
  'write.tasks' = '1',
  'hive_sync.enable'= 'true',-- 开启自动同步hive
  'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式
  'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址
  'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',     -- required, hiveServer地址
  'hive_sync.table'= 'orders_2hudi',-- hive 新建表名
  'hive_sync.db'= 'test',-- hive 新建数据库
  'hive_sync.username'= 'admin',-- HMS 用户名
  'hive_sync.password'= 'admin',-- HMS 密码
  'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);
  1. 将数据写入hudi
insert into products_2hudi
select * from products_cdc2kafka;

insert into orders_2hudi
select * from orders_cdc2kafka;
  1. 相同数据源MOR表和COW对比
create table orders_2hudi_cow2 (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi/data/orders_2hudi_cow2',
  'table.type' = 'COPY_ON_WRITE',
  'write.bucket_assign.tasks' = '1',
  'write.tasks' = '1',
  'hive_sync.enable'= 'true',-- 开启自动同步hive
  'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式
  'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址
  'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',     -- required, hiveServer地址
  'hive_sync.table'= 'orders_2hudi_cow2',-- hive 新建表名
  'hive_sync.db'= 'test',-- hive 新建数据库
  'hive_sync.username'= 'admin',-- HMS 用户名
  'hive_sync.password'= 'admin',-- HMS 密码
  'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);

insert into orders_2hudi_cow2
select * from orders_cdc2kafka;
  • 对比结果
    在初始化数据时,MOR两个表Hive查询都没数据,但是目录hdfs:///hudi/data/orders_2hudi 中有log文件,并且log文件中有数据。
    COW的表已经生成了parquet文件,并且Hive查询有数据

3.6 Hive查询

select * from  test.products_2hudi;
select * from  test.orders_2hudi_rt;
select * from  test.orders_2hudi_ro;

3.7 数据变更测试

  1. Insert
--写入数据
INSERT INTO test.orders VALUES (default, '2022-04-30 10:08:22', 'Raj', 50.50, 101, false);
--查看HDFS上的数据 hadoop fs -ls /hudi/data/orders_2hudi*
--COW 有新增 ,新增一个parquet 文件
--MOR 无新增,有一个log文件

INSERT INTO test.orders VALUES (default, '2022-04-30 10:11:09', 'Terry', 15.00, 102, false);
--COW 有新增,新增一个parquet 文件
--MOR 无新增

INSERT INTO test.orders VALUES (default, '2022-04-30 12:00:30', 'Jackson', 25.25, 103, false);
--COW 有新增,新增一个parquet 文件
--MOR 无新增

--Commits5次
INSERT INTO test.orders VALUES (default, '2022-04-30 12:01:30', 'xiaoming', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt和ro都有数据 ,新增一个parquet 文件

INSERT INTO test.orders VALUES (default, '2022-04-30 12:04:30', 'Walet', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增 新增一个log文件

INSERT INTO test.orders VALUES (default, '2022-04-30 12:05:30', 'jassy', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增

INSERT INTO test.orders VALUES (default, '2022-04-30 12:06:30', 'xiahua', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增

INSERT INTO test.orders VALUES (default, '2022-04-30 12:07:30', 'Tommmmm', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增

-- 默认5次Commits会合并一次
INSERT INTO test.orders VALUES (default, '2022-04-30 12:07:30', 'kkk', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro有新增(比COW和rt晚checkpoint时间) ,新增一个parquet 文件
  1. Update
--每次Update超过一次Checkpoint时间间隔

update test.orders set price=1000 where order_id =10030;

update test.orders set customer_name='tomy' where order_id =10029;

update test.orders set customer_name='xh' where order_id =10028;

update test.orders set customer_name='jasssssssy' where order_id =10027;

update test.orders set customer_name='waletttt' where order_id =10026;

--Update 结果
--COW 数据正常被修改  
--MOR 修改的那条数据rt和ro的数据都丢失了
--在dd群里问玉兆老师,最近有一个改动会在0.12.0发布,https://github.com/apache/hudi/pull/6286
  1. Delete
--每次Delete超过一次Checkpoint时间间隔
delete from test.orders where order_id=10005;

delete from test.orders where order_id=10006;

delete from test.orders where order_id=10007;

delete from test.orders where order_id=10008;

delete from test.orders where order_id=10009;

delete from test.orders where order_id=10010;

--Delete 结果
--COW 数据正常被删除
--MOR 数据正常被删除

The End.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐