kafka集群(docker环境)及springboot整合
kafka集群(docker环境)及springboot整合一、kafka集群搭建1、环境准备linux环境(vm环境)docker环境zookeeper 环境kafka的工作依赖于zookeeper,在搭建kafka集群时,必须搭建好zookeeper集群,准备三台服务器或虚拟机比较麻烦,为了简化使用docker环境。参考zookeeper集群(docker)搭建如图三个zookeeper容器组
·
kafka集群(docker环境)及springboot整合
kafka理论
一、kafka集群搭建
1、环境准备
-
linux环境(vm环境)
-
zookeeper 环境
kafka的工作依赖于zookeeper,在搭建kafka集群时,必须搭建好zookeeper集群,准备三台服务器或虚拟机比较麻烦,为了简化使用docker环境。
参考zookeeper集群(docker)搭建
如图三个zookeeper容器组成的集群
2、kafka集群搭建
1、集群规划
2、集群搭建
-
拉取镜像
docker pull wurstmeister/kafka
-
创建容器
docker run -d \ --name=kafka1 \ --restart=always \ -p 9092:9092 \ --network=my-net \ -e KAFKA_ADVERTISED_HOST_NAME=192.168.48.131 \ -e HOST_IP=192.168.48.131:9092 \ -e KAFKA_ADVERTISED_PORT=9092 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2182,zookeeper3:2183 \ -e KAFKA_BROKER_ID=0 \ wurstmeister/kafka:latest
参数说明:
- –network: 使用docker 自定义的网络通道
- KAFKA_ADVERTISED_HOST_NAME:宿主机地址
- KAFKA_ADVERTISED_PORT:宿主机端口
- KAFKA_ZOOKEEPER_CONNECT:zookeeper集群地址
- KAFKA_BROKER_ID:broked.id集群中必须唯一
- HOST_IP:暴露的宿主机地址
如上创建三个容器
注:修改容器名称与端口号
3、kafka集群监控
使用KafkaOffsetMonitor-assembly-0.4.6.jar对kafka集群监控
1、在/opt/module/下创建kafka-offset-console文件夹
2、将上传的jar包放入刚创建的目录下
3、在/opt/module/kafka-offset-console目录下创建mobile-logs文件夹
4、在/opt/module/kafka-offset-console目录下创建启动脚本start.sh
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers 192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094 \
--kafkaSecurityProtocol PLAINTEXT \
--zk 192.168.48.131:2181,192.168.48.131:2182,192.168.48.131:2183 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &
5、启动监控
./start.sh
6、在主机访问测试
致此集群搭建完成;
二、springboot整合
1、导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置文件
server.port=8080
#============== kafka ===================
spring.kafka.bootstrap-servers=192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094
#=============== provider =======================
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.interceptor.class=com.example.demo.Interceptor.TimeInterceptor,com.example.demo.Interceptor.CounterInterceptor
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3、配置类
@Configuration
public class KafkaConfigration {
@Autowired
private KafkaProperties properties;
@Value("#{'${spring.kafka.producer.interceptor.class}'.split(',')}")
private ArrayList<String> interceptors;
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
Map<String, Object> map = this.properties.buildProducerProperties();
map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(map);
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
}
4、Controller层
@Autowired
private KafkaTemplate kafkaTemplate;
//从前端接收消息,并调用生产者封装的api发送消息
@GetMapping("/sendMassage/{massage}")
public String sendMassage(@PathVariable("massage") String massage){
kafkaTemplate.send("first", JSON.toJSONString(massage));
return "消息已发送";
}
5、消费消息
@KafkaListener(topics = {"first"})
public String receMassage(ConsumerRecord<?,?> consumerRecord){
//判断是否为null
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){
//得到Optional实例中的值
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
return null;
}
6、拦截器
在拦截器中对消息进行处理
1、时间拦截器
@Component
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> map) {
}
/*
* 创建一个新的record,把时间戳写入消息体的最前部
* */
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
new SimpleDateFormat("yyyy/MM/dd HH-mm-ss").format(System.currentTimeMillis()) + "," + producerRecord.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
}
2、计数拦截器
@Component
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// 统计成功和失败的次数
if (e == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
@Override
public void configure(Map<String, ?> map) {
}
}
7、日志配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<contextName>logback</contextName>
<!--输出到控制台-->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>-->
<encoder>
<pattern>%d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="logFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<Prudent>true</Prudent>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>
poslog/%d{yyyy-MM-dd}/%d{yyyy-MM-dd}.log
</FileNamePattern>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss} -%msg%n
</Pattern>
</layout>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<logger name="org.apache.kafka" level="warn">
<appender-ref ref="STDOUT" />
</logger>
<root level="INFO,ERROR">
<appender-ref ref="console" />
<appender-ref ref="logFile" />
</root>
</configuration>
8、测试
更多推荐
已为社区贡献2条内容
所有评论(0)