springboot 整合kafka,解决本地不能连上服务器kafka的问题
springboot整合kafka,原理不多说了,道理大家都懂,直接上代码了。整合前,kafka需要启动,我现在使用的是虚拟机里面的kafka1.maven配置<!--kafka支持--><dependency><groupId>org.springframework.kafka</groupId><ar...
·
springboot整合kafka,原理不多说了,道理大家都懂,直接上代码了。
整合前,kafka需要启动,我现在使用的是虚拟机里面的kafka
1.maven配置
<!--kafka支持-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.kafka配置文件
#kafka相关配置
spring.kafka.bootstrap-servers=192.168.43.128:9092
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
kafka的端口号为什么是9093,等下说
3.kafka生产者
/**
* kafka消息产生者
*/
@RestController
public class KafkaSentController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 发送消息到kafka
* 主题Topic
*/
@RequestMapping("/kafka")
public void sendChannelMess(String message){
kafkaTemplate.send("Topic",message);
}
}
4.kafka消费者consumern
@Component
public class KafkaConsumer {
/**
* 监听Topic主题,有消息就读取
* @param message
*/
@KafkaListener(topics = {"Topic"})
public void receiveMessage(String message){
//收到通道的消息之后执行秒杀操作
System.out.println(message);
}
}
5启动springboot项目
就会发现一直连不上kafka,这个是kafka只允许在一个服务器上面使用,如果是其他服务器调用kafka服务器的话,就调不通
这个需要对kafka配置文件做处理 进入kafka config文件,编辑server.proper
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=INSIDE://127.0.0.1:9092,OUTSIDE://127.0.0.1:9093
inter.broker.listener.name = INSIDE
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=INSIDE://127.0.0.1:9092,OUTSIDE://127.0.0.1:9093
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,PLAINTEXT:PLAINTEXT
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
这个时间将springboot的配置文件 kafka端口号改成9093,再启动项目就不报错了
server.port=23451
#kafka相关配置
spring.kafka.bootstrap-servers=192.168.43.128:9093
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
6.访问,验证结果
下面打印结果跟传参一样,springboot集成kafka成功
源码地址:https://github.com/chengda1992/mxlgslcd.git
更多推荐
已为社区贡献1条内容
所有评论(0)