流批一体--将Kafka流式数据摄取至Hudi
Hudi支持以下存储数据的视图读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。实时视图 : 在此视图上的查询将查看某个增量
Hudi支持以下存储数据的视图
- 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能
- 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。
- 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。
一、将Kafka流式数据摄取至Hudi: DeltaStreamer
Hudi自带的DeltaStreamer工具写数据到Hudi,
开启--enable-hive-sync 即可同步数据到hive表。
HoodieDeltaStreamer
实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。
- 从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件
增量导入 - 支持json、avro或自定义记录类型的传入数据
- 管理检查点,回滚和恢复
- 利用DFS或Confluent schema注册表的Avro模式。
- 支持自定义转换操作
一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea 使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译 注意: 1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本,请修改pom重新编译
<hadoop.version>3.0.0</hadoop.version>
2.1 DeltaStreamer启动命令
spark-submit --master yarn \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 1G \
--executor-cores 4 \
--deploy-mode cluster \
--conf spark.yarn.executor.memoryOverhead=512 \
--conf spark.yarn.driver.memoryOverhead=512 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \
--props hdfs://../kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--target-base-path hdfs://../business \
--op UPSERT \
--target-table business \ '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'
--enable-hive-sync \ '开启同步至hive'
--table-type MERGE_ON_READ \
--source-ordering-field create_time \
--source-limit 5000000
2.2 kafka.properties配置实例
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段
3.1.1 使用Spark查询
spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false '在进行快照视图查询的时候需要添加此配置'
如果使用MOR模式写入数据会在hive的dwd库下面生成两张表。分别是test_ro 和 test_rt test_rt表支持:快照视图和增量视图查询 test_ro表支持:读优化视图查询
#快照视图
spark.sql("select count(*) from dwd.test_rt").show()
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show()
#增量视图
saprk sql不支持
3.1.2 使用Hive查询
beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx -p t3cx \
--hiveconf hive.stats.autogather=false \
#读优化查询
select * from dwd.test_ro;
#快照查询
select * from dwd.test_rt;
#增量查询
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hoodie.test.consume.mode=INCREMENTAL;
set hoodie.test.consume.max.commits=3;
set hoodie.test.consume.start.timestamp=20200427114546;
select count(*) from dwd.test_rt where `_hoodie_commit_time` > '20200427114546';
#注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,clouder用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat
最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定
如果使用COW模式写入数据,会在hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图
#快照视图 spark.sql("select count(*) from dwd.test").show()
4. 总结
DeltaStreamer是Hudi提供的非常实用的工具,通过DeltaStreamer可以将Kafka、DFS上的数据导入Hudi,而本篇博文主要讲解了如何使用DeltaStreamer将数据从Kafka导入Hudi,并演示了如何使用Spark和Hive查询Hudi数据。
二、将Kafka流式数据摄取至Hudi:SparkStreaming
Hudi 提供了Hudi 表的概念,这些表支持CRUD操作。我们可以基于这个特点,将Mysql Binlog的数据重放至Hudi表,然后基于Hive对Hudi表进行查询分析。数据流向架构如下
binlog数据写入Hudi表
- binlog-consumer分支使用Spark streaming消费kafka中的Binlog数据,并写入Hudi表。Kafka中的binlog是通过阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit
--class com.niceshot.hudi.CanalKafkaImport2Hudi \
--name hudi__goods \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 40 \
--queue hudi \
--conf spark.executor.memoryOverhead=2048 \
--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.locality.wait=100 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.receiver.maxRate=500 \
--conf spark.streaming.kafka.maxRatePerPartition=200 \
--conf spark.ui.retainedJobs=10 \
--conf spark.ui.retainedStages=10 \
--conf spark.ui.retainedTasks=10 \
--conf spark.worker.ui.retainedExecutors=10 \
--conf spark.worker.ui.retainedDrivers=10 \
--conf spark.sql.ui.retainedExecutions=10 \
--conf spark.yarn.submit.waitAppCompletion=false \
--conf spark.yarn.maxAppAttempts=4 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=20 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=8 \
/data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
历史数据同步以及表元数据同步至hive
history_import_and_meta_sync
分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操作
同步历史数据至hudi表
这里采用的思路是
- 将mysql全量数据通过注入sqoop等工具,导入到hive表。
- 然后采用分支代码中的工具HiveImport2HudiConfig,将数据导入Hudi表
HiveImport2HudiConfig提供了如下一些参数,用于配置程序执行行为
一个程序执行demo
nohup java -jar hudi-learn-1.0-SNAPSHOT.jar
--sync-hive-db-name hudi_temp
--sync-hive-table-name crm__wx_user_info
--base-save-path hdfs://192.168.2.2:8020/hudi_table/
--mapping-mysql-db-name crm
--mapping-mysql-table-name "order"
--primary-key "id"
--partition-key created_date
--hive-site-path /etc/lib/hive/conf/hive-site.xml
--tmp-data-path /data/tmp > order.log &
同步hudi表结构至hive meta
需要将hudi的数据结构和分区,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi数据,并通过sql进行查询分析。Hudi本身在消费Binlog进行存储时,可以顺带将相关表元数据信息同步至hive。但考虑到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。所以我单独开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。考虑到目前程序只支持按天分区,所以同步工具可以一天执行一次即可。参数配置如下
参数名 | 含义 | 是否必填 | 默认值 |
-hive-db-name | 指定hudi表同步至哪个hive数据库 | 是 | 无 |
-hive-table-name | 指定hudi表同步至哪个hive表 | 是 | 无 |
-hive-jdbc-url | 指定hive meta的jdbc链接地址,例如jdbc:hive2://192.168.16.181:10000 | 是 | 无 |
-hive-user-name | 指定hive meta的链接用户名 | 否 | 默认hive |
-hive-pwd | 指定hive meta的链接密码 | 否 | 默认hive |
-hudi-table-path | 指定hudi表所在hdfs的文件路径 | 是 | 无 |
-hive-site-path | 指定hive的hive-site.xml路径 | 是 | 无 |
一个程序执行demo
java -jar hudi-learn-1.0-SNAPSHOT.jar
--hive-db-name streaming
--hive-table-name crm__order
--hive-user-name hive
--hive-pwd hive
--hive-jdbc-url jdbc:hive2://192.168.16.181:10000
--hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order
--hive-site-path /lib/hive/conf/hive-site.xml
一些踩坑、hive相关配置
有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载Hudi数据的Hive外表读取到所有Hudi的Parquet数据,从而导致最终的读取结果重复。
需要将hive的format改为org.apache.hadoop.hive.ql.io.HiveInputFormat
,为了避免在整个集群层面上更改对其余离线Hive Sql造成不必要的影响,建议只对当前hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
三、将Kafka流式数据摄取至Hudi:Flink sql
7.1 启动kafka生产者,生产数据
1.1 启动user生产者,生产数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic user
{"user_id":"a0001","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a0002","order_amount":12.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a0003","order_amount":13.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a0004","order_amount":14.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a0005","order_amount":15.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a0006","order_amount":16.0,"log_ts":"2020-11-26 12:12:13"}
1.2 启动user_hobby生产者,生产数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic user_hobby
{"user_id":"a0001","name":"yangge","hobby":"足球"}
{"user_id":"a0002","name":"baba","hobby":"电影"}
{"user_id":"a0003","name":"mama","hobby":"游戏"}
{"user_id":"a0004","name":"dudu","hobby":"动画片"}
{"user_id":"a0005","name":"gege","hobby":"手机"}
{"user_id":"a0006","name":"jiejie","hobby":"睡觉"}
7.2 在Flink SQL客户端创建kafka对应的映射表
2.1 在Flink SQL客户端创建user表
CREATE TABLE user_ODS(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
)WITH(
'connector' = 'kafka',
'topic' = 'user',
'properties.bootstrap.servers' = 'node1:9092',
'scan.startup.mode'='earliest-offset',
'properties.group.id' = 'testGroup',
'format' = 'json'
);
select * from user_ODS;
2.2 在flink SQL客户端创建user_hobby表
CREATE TABLE user_hobby_ODS(
user_id STRING,
name STRING,
hobby STRING
)WITH(
'connector' = 'kafka',
'topic' = 'user_hobby',
'properties.bootstrap.servers' = 'node1:9092',
'scan.startup.mode'='earliest-offset',
'properties.group.id' = 'testGroup',
'format' = 'json'
);
select * from user_hobby_ODS;
7.3 Flink sql使用 hudi connector 创建hudi表
3.1 使用 hudi connector 创建hudi表
CREATE TABLE hudi_user(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/hudi_user',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'write.precombine.field' = 'log_ts',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'compaction.async.enabled' = 'false'
);
往Hudi表插入数据
insert into hudi_user select * from user_ODS;
select * from hudi_user ;
3.2 使用 hudi connector 创建hudi表
CREATE TABLE hudi_user_hobby(
user_id STRING,
name STRING,
hobby STRING
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/hudi_user_hobby',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'write.precombine.field' = 'user_id',
'compaction.async.enabled' = 'false'
);
insert into hudi_user_hobby select * from user_hobby_ODS;
select * from hudi_user_hobby;
7.4 使用 hudi connector 创建hudi DWD表
4.1 在Flink SQL 创建DWD输出表
CREATE TABLE user_hobby_DWD (
user_id STRING,
name STRING,
hobby STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
)WITH(
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/user_hobby_DWD',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'write.precombine.field' = 'user_id',
'compaction.async.enabled' = 'false'
);
INSERT INTO user_hobby_DWD
SELECT A.user_id, B.name, B.hobby, A.order_amount, A.log_ts
FROM hudi_user A
JOIN hudi_user_hobby B ON A.user_id = B.user_id;
注意事项:字段的顺序和最终写入表的字段顺序必须一致,不一致会报错.
出现这样的结果,说明join完成.
更多推荐
所有评论(0)