kafka手动提交偏移量
生产者提交数据的时候 开启事务/*** Description:*生产者提交数据的时候 开启事务*/package com.doit.kafka.day02import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.kafka
·
生产者提交数据的时候 开启事务
/**
* Description:
* 生产者提交数据的时候 开启事务
*/
package com.doit.kafka.day02
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object _04_CommitProducer {
def main(args: Array[String]): Unit = {
//设置参数
val pro = new Properties()
pro.setProperty("bootstrap.servers","centos01:9092,centos02:9092,centos03:9092")
pro.setProperty("key.serializer",classOf[StringSerializer].getName)
pro.setProperty("value.serializer",classOf[StringSerializer].getName)
//开启事务需要设置这个参数
pro.setProperty("transactional.id","transactionId")
//创建kafka生产者的客户端
val producer = new KafkaProducer[String, String](pro)
val topics = "producer_01"
try {
//初始化事务
producer.initTransactions()
//开启事务
producer.beginTransaction()
val record1 = new ProducerRecord[String, String](topics, "hello_" + System.currentTimeMillis())
val record2 = new ProducerRecord[String, String](topics, "hello_" + System.currentTimeMillis())
val record3 = new ProducerRecord[String, String](topics, "hello_" + System.currentTimeMillis())
producer.send(record1)
producer.send(record2)
// val i = 100 / 0
producer.send(record3)
//提交事务
producer.commitTransaction()
} catch {
case e:Exception => {
//回滚事务
producer.abortTransaction()
}
} finally {
producer.close()
}
}
}
消费者异步提交偏移量,方法不是阻塞的,效率比较高,且能够指定提交成功的回调方法
package com.doit.kafka.day02
import java.time.Duration
import java.util.Properties
import java.util
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
object _05_Commit_Consumer {
def main(args: Array[String]): Unit = {
//设置参数
val pro = new Properties()
pro.setProperty("bootstrap.servers","centos01:9092,centos02:9092,centos03:9092")
pro.setProperty("key.deserializer",classOf[StringDeserializer].getName)
pro.setProperty("value.deserializer",classOf[StringDeserializer].getName)
pro.setProperty("auto.offset.reset","earliest") //从头开始读取
//设置组id
pro.setProperty("group.id","g007")
//设置只读取写入成功的
pro.setProperty("isolation.level","read_committed")
//不让消费者自动提交偏移量
//enable.auto.commit 默认值就是true 5秒中更新一次
pro.setProperty("enable.auto.commit","false")
//创建kafka的消费者客户端
val consumer = new KafkaConsumer[String, String](pro)
//订阅topic
consumer.subscribe(util.Arrays.asList("producer_01"))
while (true){
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(5))
if (!records.isEmpty){
import scala.collection.JavaConverters._
for (elem <- records.asScala) {
println("value: " + elem.value() + ",offset: " + elem.offset())
}
//只需要提交一次拉取数据中偏移量最大值即可
//自动提交偏移量,如果消费者的数量小于分区的数量,那么一次拉过来的数据
//可能会有多个分区的数据,因此不能仅仅根据partition进行分组,还需要topic
val grouped: Map[(String, Int), Iterable[ConsumerRecord[String, String]]] = records.asScala
.groupBy(record => {
(record.topic(), record.partition())
})
val maxedOffSet: Map[(String, Int), ConsumerRecord[String, String]] = grouped.mapValues(iter => {
val list: List[ConsumerRecord[String, String]] = iter.toList
list.maxBy(_.offset())
})
val offsets = maxedOffSet.map(tp => {
val topicAndPartition = tp._1
val topic: String = topicAndPartition._1
val partition: Int = topicAndPartition._2
val record: ConsumerRecord[String, String] = tp._2
val offset = record.offset()
(new TopicPartition(topic, partition), new OffsetAndMetadata(offset, null))
}).asJava//需要转换为java的类型
//异步提交偏移量,方法不是阻塞的,因此效率比较高,并且能够指定提交成功的回调方法
consumer.commitAsync(offsets,new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
for (elem <- offsets.asScala) {
val topicAndPar = elem._1
val offsetAndMea = elem._2
println("topic:" + topicAndPar.topic() + " ,"
+ "partition:" + topicAndPar.partition()+" , " + "offset:" + offsetAndMea.offset() + "提交成功")
}
}
})
}
}
}
}
更多推荐
所有评论(0)