本文来说下SpringBoot整合kafka之kafka分区实战


准备工作

当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下

初始化配置信息

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaInitialConfiguration {

    /***
     * 创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
     * 通过bean创建(bean的名字为initialTopic)
     * @return
     */
    @Bean
    public NewTopic initialTopic() {

        return new NewTopic("topic.quick.initial",8, (short) 1 );
    }


    /**
     * 此种@Bean的方式,如果topic的名字相同,那么会覆盖以前的那个
     * //修改后|分区数量会变成11个 注意分区数量只能增加不能减少
     * @return
     */
    @Bean
    public NewTopic initialTopic2() {

        return new NewTopic("topic.quick.initial",11, (short) 1 );
    }

}

程序代码

生产者

@Slf4j
@RestController
@RequestMapping("/api/kafka")
@Api(tags = "kafka测试开发")
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Resource(name = "initialTopic")
    private NewTopic newTopic;

    @GetMapping("/callbackOne")
    @ApiOperation(value = "带回调的生产者")
    public void sendMessage2(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {

        log.info("========================================>>>");
        log.info(newTopic.name());

        // 带回调的生产者
        kafkaTemplate.send(newTopic.name(), callbackMessage).addCallback(success -> {

            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            log.info("发送消息成功:" + topic + "-" + partition + "-" + offset + "-" + callbackMessage);
        }, failure -> {
            log.info("发送消息失败:" + failure.getMessage());
        });
    }
    
}

消费者

@Component
@Slf4j
public class KafkaConsumer {

    // 消费监听
    @KafkaListener(topics = {"topic.quick.initial"})
    public void onMessage1(ConsumerRecord<?, ?> record){

        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("==============================================>");
        StringBuffer sb = new StringBuffer();
        // 主题
        sb.append(record.topic() + "-");
        // 分区
        sb.append(record.partition() + "-");
        // 需要消费的值
        sb.append(record.value() + "-");
        // 位移
        sb.append(record.offset());

        log.info("消费者进行消费:"+ sb);
    }
}

程序测试

使用swagger来进行程序测试

在这里插入图片描述


本文小结

本文简单进行了SpringBoot整合kafka之kafka分区实战。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐