工具类

public class KafkaUtils {
    /**
     * 功能描述: <br>
     * 〈自定义build,生产kafkaSource〉
     * @Param: [env, topic, groupId, offsets]
     * @Return: org.apache.flink.streaming.api.datastream.DataStreamSource<java.lang.String>
     * @Author: sheng
     * @Date: 2022/5/31 11:15 上午
     */
    public static DataStreamSource<String> getNewKafkaSource(StreamExecutionEnvironment env, String topic, String groupId, OffsetsInitializer offsets) {
        // 1.15 之后需要新方法创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setProperty("partition.discovery.interval.ms", "60000")
                .setBootstrapServers(KafkaConfig.bootstrapServers)
                .setTopics(topic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(offsets)
                .build();

        return env.fromSource(source, WatermarkStrategy.noWatermarks(), topic);
    }


    /**
     * 功能描述: <br>
     * 〈kafkaSink,按照指定字段分组,设置压缩〉
     * @Param: [topic, filed]
     * @Return: org.apache.flink.connector.kafka.sink.KafkaSink<java.lang.String>
     * @Author: sheng
     * @Date: 2022/6/1 11:01 上午
     */
    public static KafkaSink<String> kafkaSink(String topic, String filed) {
        Properties properties = new Properties();
        properties.setProperty("compression.type", "lz4");
        properties.setProperty("compression.codec", "lz4");
        return KafkaSink.<String>builder()
                .setBootstrapServers(KafkaConfig.bootstrapServers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .setPartitioner(new FlinkKafkaPartitioner<String>() {
                            @Override
                            public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
                                JSONObject jsonObject = JSONObject.parseObject(record);
                                Object o = jsonObject.get(filed);
                                return Math.abs(o.hashCode() % partitions.length);
                            }
                        } )
                        .build()
                )
                //setDeliverGuarantee 1.14官方文档有错误,1.15修改过来了
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .setKafkaProducerConfig(properties)
                .build();

}

使用

// 以下是kafka的offset设置
// OffsetsInitializer.committedOffsets()
// OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
// OffsetsInitializer.timestamp(1592323200L)
// OffsetsInitializer.earliest()
// OffsetsInitializer.latest()
KafkaUtils
                .getNewKafkaSource(env, "topic", "groupId", OffsetsInitializer.latest())
                .map(new MapFunction<String, JSONObject>() {
                    @Override
                    public JSONObject map(String value) throws Exception {
                        return JSON.parseObject(value,JSONObject.class);
                    }
                })
                .print("测试");
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐