Flink1.14新版kafkaSource和kafkaSink
工具类public class KafkaUtils {/*** 功能描述: <br>* 〈自定义build,生产kafkaSource〉* @Param: [env, topic, groupId, offsets]* @Return: org.apache.flink.streaming.api.datastream.DataStreamSource<java.lang.St
·
工具类
public class KafkaUtils {
/**
* 功能描述: <br>
* 〈自定义build,生产kafkaSource〉
* @Param: [env, topic, groupId, offsets]
* @Return: org.apache.flink.streaming.api.datastream.DataStreamSource<java.lang.String>
* @Author: sheng
* @Date: 2022/5/31 11:15 上午
*/
public static DataStreamSource<String> getNewKafkaSource(StreamExecutionEnvironment env, String topic, String groupId, OffsetsInitializer offsets) {
// 1.15 之后需要新方法创建kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperty("partition.discovery.interval.ms", "60000")
.setBootstrapServers(KafkaConfig.bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(offsets)
.build();
return env.fromSource(source, WatermarkStrategy.noWatermarks(), topic);
}
/**
* 功能描述: <br>
* 〈kafkaSink,按照指定字段分组,设置压缩〉
* @Param: [topic, filed]
* @Return: org.apache.flink.connector.kafka.sink.KafkaSink<java.lang.String>
* @Author: sheng
* @Date: 2022/6/1 11:01 上午
*/
public static KafkaSink<String> kafkaSink(String topic, String filed) {
Properties properties = new Properties();
properties.setProperty("compression.type", "lz4");
properties.setProperty("compression.codec", "lz4");
return KafkaSink.<String>builder()
.setBootstrapServers(KafkaConfig.bootstrapServers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.setPartitioner(new FlinkKafkaPartitioner<String>() {
@Override
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
JSONObject jsonObject = JSONObject.parseObject(record);
Object o = jsonObject.get(filed);
return Math.abs(o.hashCode() % partitions.length);
}
} )
.build()
)
//setDeliverGuarantee 1.14官方文档有错误,1.15修改过来了
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setKafkaProducerConfig(properties)
.build();
}
使用
// 以下是kafka的offset设置
// OffsetsInitializer.committedOffsets()
// OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
// OffsetsInitializer.timestamp(1592323200L)
// OffsetsInitializer.earliest()
// OffsetsInitializer.latest()
KafkaUtils
.getNewKafkaSource(env, "topic", "groupId", OffsetsInitializer.latest())
.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSON.parseObject(value,JSONObject.class);
}
})
.print("测试");
更多推荐
所有评论(0)