在官方提供的flink-connect-redis中并没有实现Flink SQL的方式读写redis。要想实现Flink sql的方式读写redis需要自己实现对应的代码,好在GitHub已经有了对应的开源实现,我们只需对应的借鉴使用即可。

pom 依赖:

<dependency>
    <groupId>io.github.jeff-zou</groupId>
    <artifactId>flink-connector-redis</artifactId>
    <version>1.0.11</version>
</dependency>

通过导入jar,可以实现Flink sql 读写redis :

先创建Flink CDC 的一张表:

CREATE TABLE memberCDC(
  `member_id`                  bigint,
   mobile string,
  `tenant_id`           string   ,
  gender                int ,
   proctime as procTime(),
     PRIMARY KEY(member_id) NOT ENFORCED
     ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '10.50.20.181',
     'port' = '330806',
     'username' = 'reGaYTkly',
     'password' = '40IGOJHFDlloBE6',
     'database-name' = 'member_db',
     'table-name' = 'm_member_tenant',
   'scan.startup.mode'='latest-offset',
   'debezium.skipped.operations'='d'
   )

Flink SQL 写入redis(以Hset数据类型为例):

CREATE TABLE memberRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.180.80.89',
'port' = '300079',
'password' = 'big88dajjggta34',
'redis-mode' = 'single',
'command'='hset'
)

inserter into memberRedis
select
member_id,
mobile,
tenant_id,
gender
from memberCDC

    这里 member_id 相当于Hset的  additionalKey mobile相当于key,而这可以通过SQL 语句动态的拼接生成需要的key格式。相当于可以支持动态生成你需要的key.

Flink SQL读取redis(以Hset数据类型为例):

读取redis中的数据,一般来用作为维度关联使用:

CREATE TABLE memberSourceRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.110.60.5',
'port' = '30479',
'password' = 'daffdag1234',
'redis-mode' = 'single',
'command'='hget',
'maxIdle'='2',
'minIdle'='1',
'lookup.cache.max-rows'='10',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3'
)
insert into memberRedis
select
t2.member_id,
t2.mobile,
t2.tenant_id,
t1.gender
from memberCDC t1
left join memberSourceRedis for system_time as of t1.proctime as t2 on
t1.member_id = t2.member_id and t1.mobile = t2.mobile

    类似读取,这里 member_id 相当于Hset的  additionalKey mobile相当于key,这里可以通过SQL 语句动态的拼接生成key,传进来的key实际上就是memberCDC中关联字段的数据值。

连接redis参数列表:

总结:

    上述Flink SQL 读写redis 对于 hset 数据类型来说都支持一个字段的值写入和读取,多个字段写入会被默认过滤。

    如果要支持多个字段的写入和读取,需要修改依赖代码,在写入时候将多个字段封装为JSONString 写入Redis,在读取的时候将JSON数据通过 catalog将映射为对应多个字段的值。

    


    后续文章中也会介绍如何修改源码,编译jar包来实现支持多个字段的Flink SQL的读写redis。如果你有需要也可以通过下面的链接来实现对应的flink sql读写Redis的特殊业务需求。感觉有用也欢迎关注   迪答   公众号~~

    本文参考GitHub的开源项目:https://github.com/jeff-zou/flink-connector-redis

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐