Spring Cloud stream 使用Kafka踩坑记录
今天在使用spring cloud整合消息队列时,遇到个问题。本机虚拟机上部署了一套kafka集群 zookeeper集群也部署在同台虚拟机上在虚拟机内使用kafka-console-producer.sh、kafka-console-consumer.sh 创建广播方、消费方能正常的收发消息。但是在本机物理机上使用sping cloud 时就始终无法发送消息到kafka上去po...
今天在使用spring cloud整合消息队列时,遇到个问题。本机虚拟机上部署了一套kafka集群 zookeeper集群也部署在同台虚拟机上
在虚拟机内使用kafka-console-producer.sh、kafka-console-consumer.sh 创建广播方、消费方能正常的收发消息。
但是在本机物理机上使用sping cloud 时就始终无法发送消息到kafka上去
pom文件
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
yml配置文件
server:
port: 8050
spring:
application:
name: cloud-kafka-provider
cloud:
stream:
default:
binder: kafka
producer:
useNativeEncoding: true
default-binder: kafka
kafka:
binder:
brokers: 192.168.126.129:9090,192.168.126.129:9091,192.168.126.129:9092
zk-nodes: 192.168.126.129:2181,192.168.126.129:2182,192.168.126.129:2183
消息发送代码:
@EnableBinding
public class DefaultProviderService {
@Autowired
private BinderAwareChannelResolver resolver;
public void sendMsg(String msg, String topic) {
resolver.resolveDestination(topic).send(new GenericMessage<>(msg.getBytes()));
}
}
错误信息
Exception thrown when sending a message with key='null' and payload=***** to topic
之前一直未能发现问题的所作,后来在查看其他网友博客的时候看到他降低日志级别解决了他的问题。我也将项目日志级别降低到debug模式
发现原来一直在报错 一直提示无法连接到kafka
java kafka Connection refused: no further information
通过这个日志信息 最终找到问题所在点。 原因是我的kafka集群 在server.properties配置文件中的配置有点问题。因为我配置了每个节点kafka的port信息 并没有为节点配置host,最终修改配置如下
port:9090
host.name=192.168.126.129
重启服务,消息正常发送接收。
更多推荐
所有评论(0)