Java中使用Kafka
目录kafka-clients依赖AdminClient APIProducerConsumerStreamConnect kafka客户端api分为5大类,常用的是前3类AdminClient API:管理topic、broker之类的信息Producer API:发布消息到一个或多个topicConsumer API:订阅1个或多个topic,处理接收到的消息Stream API:流
kafka客户端api分为5大类,常用的是前3类
- AdminClient API:管理topic、broker之类的信息
- Producer API:发布消息到一个或多个topic
- Consumer API:订阅1个或多个topic,处理接收到的消息
- Stream API:流处理,将输入流经过一些处理转换为输出流
- Connector API:从一些源系统|应用中拉取数据到kafka
kafka-clients
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
AdminClient API
可以在zk上查看已存在的topic
- brokers/topics:包含topic_id、partitions、replicas、version
- conig/topics:包含version、config
win班kafka删除topic时会出现kafka闪退的问题,可以在根据出错提示,在zk上删除对应的topic,并在kafka的本地日志中删除对应topic的文件,重启kafka即可。
[2021-05-30 16:40:52,583] ERROR Error while renaming dir for topic3-0 in log dir D:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: D:\tmp\kafka-logs\topic3-0 -> D:\tmp\kafka-logs\topic3-0.c1d7bfc008ab43dfaf4506df1a063e3d-delete
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:904)
at kafka.log.Log. a n o n f u n anonfun anonfunrenameDir 2 ( L o g . s c a l a : 1072 ) a t k a f k a . l o g . L o g . r e n a m e D i r ( L o g . s c a l a : 2453 ) a t k a f k a . l o g . L o g M a n a g e r . a s y n c D e l e t e ( L o g M a n a g e r . s c a l a : 991 ) a t k a f k a . l o g . L o g M a n a g e r . 2(Log.scala:1072) at kafka.log.Log.renameDir(Log.scala:2453) at kafka.log.LogManager.asyncDelete(LogManager.scala:991) at kafka.log.LogManager. 2(Log.scala:1072)atkafka.log.Log.renameDir(Log.scala:2453)atkafka.log.LogManager.asyncDelete(LogManager.scala:991)atkafka.log.LogManager.anonfun$asyncDelete 3 ( L o g M a n a g e r . s c a l a : 1026 ) a t s c a l a . O p t i o n . f o r e a c h ( O p t i o n . s c a l a : 437 ) a t k a f k a . l o g . L o g M a n a g e r . 3(LogManager.scala:1026) at scala.Option.foreach(Option.scala:437) at kafka.log.LogManager. 3(LogManager.scala:1026)atscala.Option.foreach(Option.scala:437)atkafka.log.LogManager.anonfun$asyncDelete 2 ( L o g M a n a g e r . s c a l a : 1024 ) a t k a f k a . l o g . L o g M a n a g e r . 2(LogManager.scala:1024) at kafka.log.LogManager. 2(LogManager.scala:1024)atkafka.log.LogManager.anonfun$asyncDelete 2 2 2adapted(LogManager.scala:1022)
at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:435)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:361)
at kafka.log.LogManager.asyncDelete(LogManager.scala:1022)
at kafka.server.ReplicaManager.stopPartitions(ReplicaManager.scala:489)
at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:427)
at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:284)
at kafka.server.KafkaApis.handle(KafkaApis.scala:172)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.nio.file.AccessDeniedException: D:\tmp\kafka-logs\topic3-0 -> D:\tmp\kafka-logs\topic3-0.c1d7bfc008ab43dfaf4506df1a063e3d-delete
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:901)
… 16 more
Producer
Producer是线程安全的,批量发送消息,并非逐条发送。
Producer客户端实现了负载均衡,会随机发送消息到某个par 分区上。可以自定义par规则,路由特定的消息到指定的pat上。
使用最多的是带回调函数的发送。
Consumer
Stream
stream用于处理kafka中的数据
Connect
用于kafka与hadoop、db等数据源之间的直接操作,类似于elk,日志从应用服务器经过一些处理到es服务器,都是配置,不用写代码。用得不多。
更多推荐
所有评论(0)