生产者提交数据的时候 开启事务

/**
 * Description:
 *   生产者提交数据的时候 开启事务
 */
package com.doit.kafka.day02

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

object _04_CommitProducer {
  def main(args: Array[String]): Unit = {

    //设置参数
    val pro = new Properties()

    pro.setProperty("bootstrap.servers","centos01:9092,centos02:9092,centos03:9092")
    pro.setProperty("key.serializer",classOf[StringSerializer].getName)
    pro.setProperty("value.serializer",classOf[StringSerializer].getName)

    //开启事务需要设置这个参数
    pro.setProperty("transactional.id","transactionId")


    //创建kafka生产者的客户端
    val producer = new KafkaProducer[String, String](pro)

    val  topics = "producer_01"

    try {
      //初始化事务
      producer.initTransactions()
      //开启事务
      producer.beginTransaction()
      val record1 = new ProducerRecord[String, String](topics, "hello_" + System.currentTimeMillis())
      val record2 = new ProducerRecord[String, String](topics, "hello_" + System.currentTimeMillis())
      val record3 = new ProducerRecord[String, String](topics, "hello_" + System.currentTimeMillis())
      producer.send(record1)
      producer.send(record2)
     // val i = 100 / 0
      producer.send(record3)
      //提交事务
      producer.commitTransaction()
    } catch {
      case  e:Exception => {
        //回滚事务
         producer.abortTransaction()
      }
    } finally {
      producer.close()
    }
  }

}

消费者异步提交偏移量,方法不是阻塞的,效率比较高,且能够指定提交成功的回调方法

package com.doit.kafka.day02

import java.time.Duration
import java.util.Properties
import java.util

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer

object _05_Commit_Consumer {

  def main(args: Array[String]): Unit = {
    //设置参数
    val pro = new Properties()
    pro.setProperty("bootstrap.servers","centos01:9092,centos02:9092,centos03:9092")
    pro.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    pro.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    pro.setProperty("auto.offset.reset","earliest") //从头开始读取

    //设置组id
    pro.setProperty("group.id","g007")
    //设置只读取写入成功的
    pro.setProperty("isolation.level","read_committed")
    //不让消费者自动提交偏移量
    //enable.auto.commit 默认值就是true  5秒中更新一次
    pro.setProperty("enable.auto.commit","false")

    //创建kafka的消费者客户端
    val consumer = new KafkaConsumer[String, String](pro)


    //订阅topic
    consumer.subscribe(util.Arrays.asList("producer_01"))

    while (true){
      val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(5))
      if (!records.isEmpty){
        import scala.collection.JavaConverters._
        for (elem <- records.asScala) {
          println("value: " + elem.value() + ",offset: " + elem.offset())
        }
        //只需要提交一次拉取数据中偏移量最大值即可
        //自动提交偏移量,如果消费者的数量小于分区的数量,那么一次拉过来的数据
        //可能会有多个分区的数据,因此不能仅仅根据partition进行分组,还需要topic
        val grouped: Map[(String, Int), Iterable[ConsumerRecord[String, String]]] = records.asScala
          .groupBy(record => {
          (record.topic(), record.partition())
        })

        val maxedOffSet: Map[(String, Int), ConsumerRecord[String, String]] = grouped.mapValues(iter => {
          val list: List[ConsumerRecord[String, String]] = iter.toList
          list.maxBy(_.offset())
        })

        val offsets = maxedOffSet.map(tp => {
          val topicAndPartition = tp._1
          val topic: String = topicAndPartition._1
          val partition: Int = topicAndPartition._2
          val record: ConsumerRecord[String, String] = tp._2
          val offset = record.offset()

          (new TopicPartition(topic, partition), new OffsetAndMetadata(offset, null))
        }).asJava//需要转换为java的类型


        //异步提交偏移量,方法不是阻塞的,因此效率比较高,并且能够指定提交成功的回调方法
        consumer.commitAsync(offsets,new OffsetCommitCallback {

          override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
            for (elem <- offsets.asScala) {
              val topicAndPar = elem._1
              val offsetAndMea = elem._2
              println("topic:" + topicAndPar.topic() + " ,"
                +  "partition:"  + topicAndPar.partition()+" , " +  "offset:" + offsetAndMea.offset() + "提交成功")
            }
          }
        })
      }
    }



  }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐