flink sql 读取 kafka 数据写入 Hbase中
版本说明:flink 1.12 、Hbase 1.41、查询 flink 1.12 的官网,找寻flink sql 连接 Hbase 的信息。Apache Flink 1.12 Documentation: HBase SQL Connector根据 Hbase 版本需要下载依赖官网中的连接信息-- register the HBase table 'mytable' in Flink SQLCR
·
版本说明:
flink 1.12 、Hbase 1.4
1、查询 flink 1.12 的官网,找寻flink sql 连接 Hbase 的信息。
Apache Flink 1.12 Documentation: HBase SQL Connector
根据 Hbase 版本下载对应的依赖
官网中的连接信息
-- register the HBase table 'mytable' in Flink SQL
CREATE TABLE hTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);
-- use ROW(...) construction function construct column families and write data into the HBase table.
-- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
-- scan data from the HBase table
SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
-- temporal join the HBase table as a dimension table
SELECT * FROM myTopic
LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = hTable.rowkey;
更多推荐
已为社区贡献2条内容
所有评论(0)