Doris系列

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天和大家分享一下Doris系列之导入Kafka数据操作
#博学谷IT学习技术支持#



前言

在这里插入图片描述

接着上次的Doris系列继续和大家分享,上次讲了Doris 建表操作,和从Broker Load导入hdfs数据操作,今天和大家分享从Routine Load导入kafka数据操作。
在这里插入图片描述
如上图,Client 向 FE 提交一个例行导入作业。
FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。
在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。
FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。


一、Kafka集群使用步骤

在这里插入图片描述
Kafka也是Doris一个非常重要的数据来源。

1.启动kafka集群环境

这里根据自己的路径启动kafka集群环境

cd /export/servers/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

2.创建kafka的topic主题

这里创建一个topic名字是test的kafka消息队列,设置1个partitions ,并且只备份1份数据。

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 1 \
--partitions 1 \
--topic test

如果Topic已经存在,则可以删除

bin/kafka-topics.sh  --delete --zookeeper node01:2181  --topic test 

3.往kafka中插入一批测试数据

这里简单做个小案例,插入2条数据。

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
{"id":1,"name":"zhangsan","age":20}
{"id":2,"name":"lisi","age":30}

二、Doris使用步骤

1.创建对应表

这里根据自己kafka生成的数据创建对应字段和格式的表格

create table student_kafka2
(
id int,
name varchar(50),
age int
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;

2.创建导入作业

  • student_kafka2为第一步创建的表格名称
  • desired_concurrent_number是并行度相关的参数
  • strict_mode是否采用严格模式
  • format为导入的格式,这里是json
CREATE ROUTINE LOAD test_db.kafka_job_new on student_kafka2
PROPERTIES
(
    "desired_concurrent_number"="1",
	"strict_mode"="false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list"= "node01:9092,node02:9092,node03:9092",
    "kafka_topic" = "test",
    "property.group.id" = "test_group_1",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING",
    "property.enable.auto.commit" = "false"
);
select * from student_kafka2;

在这里插入图片描述

三、Doris常用的参数

设置删除时是否允许不分区直接删除

  • SET delete_without_partition = true;

设置最大内存限制

  • SET exec_mem_limit = 8589934592;
  • SHOW VARIABLES LIKE “%mem_limit%”;

设置最长查询时间限制

  • SET query_timeout = 600;
  • SHOW VARIABLES LIKE “%query_timeout%”;

添加新的含预聚合的列

  • ALTER TABLE table1 ADD COLUMN uv BIGINT SUM DEFAULT ‘0’ after pv;

Broadcast/Shuffle Join 操作,默认为Broadcast

  • select sum(table1.pv) from table1 join [broadcast] table2 where
    table1.siteid = 12;
  • select sum(table1.pv) from table1 join [shuffle] table2 where
    table1.siteid = 12;

总结

今天主要和大家分享了Doris系列之导入Kafka数据操作,如果大家实际工作中需要用到Kafka结合Doris操作,可以参考一下使用步骤。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐