Flink版本: 1.12.0

异常: java.time.format.DateTimeParseException

kafka数据源声明案例:

CREATE TABLE user_actions_source (
--  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
--  `partition` BIGINT METADATA VIRTUAL,
--  `offset` BIGINT METADATA VIRTUAL,
   `user_id` BIGINT,
   `behavior` STRING,
   `user_action_time` TIMESTAMP(3),
    -- 将 user_action_time 声明为事件时间属性,并使用5秒的延迟水印策略
    WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'connect_stream_zyh1',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup4',
  'scan.startup.mode' = 'latest-offset',
  -- 'scan.startup.mode' = 'group-offsets',
  'format' = 'json'
  );

业务上大部分的user_action_time为时间戳类型,但是在声明source表的时候,如果直接指定对应的时间戳字段为TIMESTAMP(3), 会出现数据类型转换异常,如下:

Caused by: java.time.format.DateTimeParseException: Text '1587527019680' could not be parsed at index 0

看了中文社区的一篇帖子: json中date类型解析失败

var calendar: Calendar = Calendar.getInstance()
calendar.add(Calendar.MINUTE,1)   //(每次累加 1min)
calendar.getTimeInMillis
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
val date = new Date(calendar.getTimeInMillis)
dateFormat.format(date)

按照帖子中发送指定格式的数据,依然出现上面的异常问题。

解决方案

方式1:

查了一些资料,解决办法如下:

CREATE TABLE user_actions_source (
--  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
--  `partition` BIGINT METADATA VIRTUAL,
--  `offset` BIGINT METADATA VIRTUAL,
   `user_id` BIGINT,
   `behavior` STRING,
   user_action_time BIGINT,  -- 创建时间
   ts AS TO_TIMESTAMP(FROM_UNIXTIME(user_action_time/1000, 'yyyy-MM-dd HH:mm:ss')),  -- 使用user_action_time字段值作为时间戳ts
    -- 将 ts 声明为事件时间属性,并使用5秒的延迟水印策略
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'connect_stream_zyh1',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup4',
  'scan.startup.mode' = 'latest-offset',
  -- 'scan.startup.mode' = 'group-offsets',
  'format' = 'json'
  );

注意: ts转换的时间戳需要是10位秒时间戳,如果是13位毫秒时间戳,转换的结果为null。

方式2

kafka中的时间格式需要满足下面的时间格式:

def getCreateTime2: String = {
    calendar.add(Calendar.MINUTE,1)   //(每次累加 30s)
    calendar.getTimeInMillis

    val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
    val date = new Date(calendar.getTimeInMillis)
    dateFormat.format(date)
  }

参考:
https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐