SpringBoot整合kafka之kafka分区实战
本文来说下SpringBoot整合kafka之kafka分区实战文章目录概述概述
·
本文来说下SpringBoot整合kafka之kafka分区实战
准备工作
当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下
初始化配置信息
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaInitialConfiguration {
/***
* 创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
* 通过bean创建(bean的名字为initialTopic)
* @return
*/
@Bean
public NewTopic initialTopic() {
return new NewTopic("topic.quick.initial",8, (short) 1 );
}
/**
* 此种@Bean的方式,如果topic的名字相同,那么会覆盖以前的那个
* //修改后|分区数量会变成11个 注意分区数量只能增加不能减少
* @return
*/
@Bean
public NewTopic initialTopic2() {
return new NewTopic("topic.quick.initial",11, (short) 1 );
}
}
程序代码
生产者
@Slf4j
@RestController
@RequestMapping("/api/kafka")
@Api(tags = "kafka测试开发")
public class KafkaController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Resource(name = "initialTopic")
private NewTopic newTopic;
@GetMapping("/callbackOne")
@ApiOperation(value = "带回调的生产者")
public void sendMessage2(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {
log.info("========================================>>>");
log.info(newTopic.name());
// 带回调的生产者
kafkaTemplate.send(newTopic.name(), callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("发送消息成功:" + topic + "-" + partition + "-" + offset + "-" + callbackMessage);
}, failure -> {
log.info("发送消息失败:" + failure.getMessage());
});
}
}
消费者
@Component
@Slf4j
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic.quick.initial"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
log.info("==============================================>");
StringBuffer sb = new StringBuffer();
// 主题
sb.append(record.topic() + "-");
// 分区
sb.append(record.partition() + "-");
// 需要消费的值
sb.append(record.value() + "-");
// 位移
sb.append(record.offset());
log.info("消费者进行消费:"+ sb);
}
}
程序测试
使用swagger来进行程序测试
本文小结
本文简单进行了SpringBoot整合kafka之kafka分区实战。
更多推荐
已为社区贡献28条内容
所有评论(0)