flink-cdc 能够读取binlog日志,从而实现mysql数据到ES的秒级同步。

好用的同时又有很多烦恼,其中时间格式就是一个很头痛的问题。

直接进入正题

使用es7和mysql 5.7为例

1.时间类型参照

首先我们已知mysql 有date 和timestamp(或者datetime)两种时间格式。

对应到ES是标准的date格式。

mysql 的date 类型格式如:"1993-02-01", 对应的ES的标准格式为:"1993-01-31T16:00:00.000Z"。

mysql的timestamp或datetime类型格式如: "1993-02-01 08:45:27",对应到es的ES标准格式类型为:"1993-02-01T00:45:27.000Z"

flink-cdc 时间格式与Mysql和es时间格式类型对照

flink-cdcmysqlelasticsearch
datedatedate
timestamptimestamp(datetime)date
timestamp_ltztimestamp(datetime)date

那么处理时间格式上我们就可以用两种方法:

1.对于新建的ES索引在建立mapping的时候将date进行格式化处理

PUT db_test {
     "mappings": {
      "properties": {
        "update_time":  {
           type: "date",
           "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        }
       }
     }
} 

这样新的的索引就只需要对照flink-cdc和es的时间格式,进行处理操作就行。

以下是sql-client的展现

create table tb_test_source (
    id bigint,
    update_time timestamp,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    ...
);

create table tb_test_sink (
    id binint,
    update_time timestamp,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    ...
);

insert into tb_test_sink select * from tb_test_source;

2.当索引原本存在,且数据重要,数据量非常大的时候,要转为标准的ESdate格式。

首先我们知道timestamp_ltz(3)带时区的时间格式,这种时间格式转化出来就是ES的标准格式样式,但是会丢失毫秒级的精度。

如果对时间精度没有要求同步到es以后时间,es中update_time 的时间格式就是:"yyyy-MM-ddTHH:mm:ssZ",如果你原本的时间格式对精度没有要求,以下的这段sql已经可以满足你的要求了。但是注意两行标注非常重要的配置,如果没有这两行配置,那么默认的时间格式标准就会是sql标准。在同步到ES中是会报错的。

create table tb_test_source (
    id bigint,
    update_time timestamp,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    ...
);

create table tb_test_sink (
    id binint,
    update_time timestamp_ltz(3),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    ...
    'format' = 'json', //非常重要的配置
	'json.timestamp-format.standard' = 'ISO-8601' //非常重要的配置
);

insert into tb_test_sink select id,cast(update_time as timestamp_ltz(3)) from tb_test_source;

非常不幸的是,我遇到的情况是需要保留时间精度为3的一堆原始数据,这样导致程序查询时序列化失败。

于是我就考虑是不是能把时间转化为"1993-01-31T16:00:00.000Z"类似的时间字符串,让es自动去识别时间字符串。

下面就用代码展示下我的解决方法,虽然暂时还不算完美:

create table tb_test_source (
    id bigint,
    update_time timestamp,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    ...
);

create table tb_test_sink (
    id binint,
    update_time string, //注意是字符串类型
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    ...
);

insert into tb_test_sink 
select 
id,
date_format(timestampadd(HOUR,-8,update_time),'yyyy-MM-dd''T''HH:mm:ss.SSS''Z''') //对时间进行格式化
from tb_test_source;

通过以上方法,终于将时间格式问题解决。

同时在ES中检索,发现同步入的数据是具有时间属性的,虽然有些波折,但是也得到了解决。

同时附上一个pyflink的代码,以便后期阐释后期关于链表出现的问题。

from datetime import datetime,timedelta
from pyflink.table import EnvironmentSettings as envs,TableEnvironment as tenv,StreamTableEnvironment as stenv,TableConfig,DataTypes
from pyflink.datastream import StreamExecutionEnvironment as senv
from pyflink.common.types import RowKind


settings = senv.get_execution_environment()
settings.set_parallelism(1)
settings.enable_checkpointing(300)
tableconfig = TableConfig()
tableconfig.set_null_check(False)

t_env =  stenv.create(settings,tableconfig)


t_env.get_config().get_configuration().set_string("pipeline.jar",
                                        ("file:///data/flink-1.13.5/lib/flink-sql-connector-mysql-cdc-2.1.1.jar;"
                                        +"file:///data/flink-1.13.5/lib/flink-sql-connector-elasticsearch6_2.11-1.13.5.jar"))

t_env.get_config().get_configuration().set_string("pipeline.classpaths",
                                        ("file:///data/flink-1.13.5/lib/flink-sql-connector-mysql-cdc-2.1.1.jar;"
                                        +"file:///data/flink-1.13.5/lib/flink-sql-connector-elasticsearch6_2.11-1.13.5.jar"))

t_env.execute_sql("""CREATE TABLE tb_test_source ( 
id bigint,    
create_time timestamp(3), 
modify_time timestamp(3), 
PRIMARY KEY (`id`) NOT ENFORCED 
) WITH (
'connector' = 'mysql-cdc',
......
)""")


t_env.execute_sql("""
CREATE TABLE tb_test_sink (
`id` bigint,
`create_time` STRING,
`modify_time` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
'connector' = 'elasticsearch-6',
....
)""")
sourceTable = t_env.from_path("tb_linkman_in")
sdata = sourceTable.select(
                            sourceTable.id,sourceTable.name,sourceTable.sex,
                            sourceTable.create_time,sourceTable.modify_time) #为什么这里不直接进行时间处理?
                        
tres = sdata.execute() #为什么这里不直接插入ES?要绕一次再写入数据,这个就涉及连表查询时,mysql多源表数据的时间差问题。 待下次在阐述其中缘由。
with tres.collect() as resluts:
    for reslut in resluts:
        if reslut.get_row_kind() == RowKind.UPDATE_BEFORE:
            continue           
        reslut.set_field_names(["id","create_time","modify_time"])
        resl = reslut.as_dict()
        createtime = resl["create_time"]
        modify_time = resl["modify_time"]
        if isinstance(createtime,datetime):
            createtime = createtime + timedelta(hours=-8)
            resl["create_time"] = createtime.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]+"Z"
        if isinstance(modify_time,datetime):
            modify_time = modify_time + timedelta(hours=-8)
            resl["modify_time"] = modify_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]+"Z"
        res = tuple(resl.values())
        rows = list()
        rows.append(res)
        s = t_env.from_elements(rows,DataTypes.ROW([
            DataTypes.FIELD('id',DataTypes.BIGINT()),
            DataTypes.FIELD('create_time',DataTypes.STRING()),
            DataTypes.FIELD('modify_time',DataTypes.STRING()),
            ]),False)
        s.execute_insert('tb_test_link')

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐