kafka客户端api分为5大类,常用的是前3类

  • AdminClient API:管理topic、broker之类的信息
  • Producer API:发布消息到一个或多个topic
  • Consumer API:订阅1个或多个topic,处理接收到的消息
  • Stream API:流处理,将输入流经过一些处理转换为输出流
  • Connector API:从一些源系统|应用中拉取数据到kafka

 

kafka-clients

依赖
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.8.0</version>
</dependency>

 

AdminClient API

可以在zk上查看已存在的topic

  • brokers/topics:包含topic_id、partitions、replicas、version
  • conig/topics:包含version、config
     

win班kafka删除topic时会出现kafka闪退的问题,可以在根据出错提示,在zk上删除对应的topic,并在kafka的本地日志中删除对应topic的文件,重启kafka即可。

[2021-05-30 16:40:52,583] ERROR Error while renaming dir for topic3-0 in log dir D:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: D:\tmp\kafka-logs\topic3-0 -> D:\tmp\kafka-logs\topic3-0.c1d7bfc008ab43dfaf4506df1a063e3d-delete
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:904)
at kafka.log.Log. a n o n f u n anonfun anonfunrenameDir 2 ( L o g . s c a l a : 1072 ) a t k a f k a . l o g . L o g . r e n a m e D i r ( L o g . s c a l a : 2453 ) a t k a f k a . l o g . L o g M a n a g e r . a s y n c D e l e t e ( L o g M a n a g e r . s c a l a : 991 ) a t k a f k a . l o g . L o g M a n a g e r . 2(Log.scala:1072) at kafka.log.Log.renameDir(Log.scala:2453) at kafka.log.LogManager.asyncDelete(LogManager.scala:991) at kafka.log.LogManager. 2(Log.scala:1072)atkafka.log.Log.renameDir(Log.scala:2453)atkafka.log.LogManager.asyncDelete(LogManager.scala:991)atkafka.log.LogManager.anonfun$asyncDelete 3 ( L o g M a n a g e r . s c a l a : 1026 ) a t s c a l a . O p t i o n . f o r e a c h ( O p t i o n . s c a l a : 437 ) a t k a f k a . l o g . L o g M a n a g e r . 3(LogManager.scala:1026) at scala.Option.foreach(Option.scala:437) at kafka.log.LogManager. 3(LogManager.scala:1026)atscala.Option.foreach(Option.scala:437)atkafka.log.LogManager.anonfun$asyncDelete 2 ( L o g M a n a g e r . s c a l a : 1024 ) a t k a f k a . l o g . L o g M a n a g e r . 2(LogManager.scala:1024) at kafka.log.LogManager. 2(LogManager.scala:1024)atkafka.log.LogManager.anonfun$asyncDelete 2 2 2adapted(LogManager.scala:1022)
at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:435)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:361)
at kafka.log.LogManager.asyncDelete(LogManager.scala:1022)
at kafka.server.ReplicaManager.stopPartitions(ReplicaManager.scala:489)
at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:427)
at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:284)
at kafka.server.KafkaApis.handle(KafkaApis.scala:172)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.nio.file.AccessDeniedException: D:\tmp\kafka-logs\topic3-0 -> D:\tmp\kafka-logs\topic3-0.c1d7bfc008ab43dfaf4506df1a063e3d-delete
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:901)
… 16 more

 

Producer

Producer是线程安全的,批量发送消息,并非逐条发送。
Producer客户端实现了负载均衡,会随机发送消息到某个par 分区上。可以自定义par规则,路由特定的消息到指定的pat上。

使用最多的是带回调函数的发送。

 

Consumer

在这里插入图片描述

在这里插入图片描述

 

Stream

stream用于处理kafka中的数据

 

Connect

用于kafka与hadoop、db等数据源之间的直接操作,类似于elk,日志从应用服务器经过一些处理到es服务器,都是配置,不用写代码。用得不多。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐