最开始使用kafka,由于kafka需要单独组件,部署不方便;
改为了redis的发布订阅,但是该模式限制过多;
最终改为redisstream的消息队列模式。

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
    </dependencies>

application.yml

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    password:

生产方

package com.guotie.dpc.data.parser.message;

import cn.hutool.core.util.StrUtil;
import com.guotie.dpc.kafka.constants.KafkaTopicConstant;
import com.guotie.dpc.kafka.message.ParserMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author liaozesong
 */
@Slf4j
@Service
public class SendMessage {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public void send(ParserMessage message) {
        try {
            ObjectRecord<String, ParserMessage> record = StreamRecords.objectBacked(message).withStreamKey(KafkaTopicConstant.DPC_C2_DATA_PARSER_SUCCESS);
            RecordId recordId = stringRedisTemplate.opsForStream().add(record);
            log.info("消息发送成功{}", recordId);
        } catch (Exception e) {
            throw new RuntimeException(StrUtil.format("消息发送失败[{}]", message));
        }
    }
}

消费方

package com.guotie.dpc.data.play.mq;

import com.guotie.dpc.kafka.constants.KafkaTopicConstant;
import com.guotie.dpc.kafka.message.ParserMessage;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;

/**
 * @author liaozesong
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class OriginalConsumer implements StreamListener<String, ObjectRecord<String, ParserMessage>> {
    private final StringRedisTemplate stringRedisTemplate;
    
    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord<String, ParserMessage> record) {
        ParserMessage message = record.getValue();
        try {
            log.info("1.监听到消息[{}]", message);
        } catch (Exception e) {
            log.warn("listen msg error:json:{}", message);
            log.warn(e.getMessage(), e);
        } finally {
            this.stringRedisTemplate.opsForStream().acknowledge(KafkaTopicConstant.DPC_C2_DATA_PARSER_SUCCESS, record);
        }
    }
}

消费方监听

package com.guotie.dpc.data.play.config;

import com.guotie.dpc.data.play.mq.OriginalConsumer;
import com.guotie.dpc.kafka.constants.KafkaTopicConstant;
import com.guotie.dpc.kafka.message.ParserMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnection;
import org.springframework.data.redis.connection.lettuce.LettuceConverters;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.time.Duration;

/**
 * redis 消息队列
 *
 * @author liaozesong
 */
@Slf4j
@Configuration
public class RedisMqConfig {
    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Resource
    private OriginalConsumer originalConsumer;

    private static final String DEFAULT_TOPIC = KafkaTopicConstant.DPC_C2_DATA_PARSER_SUCCESS;
    private static final String DEFAULT_GROUP = DEFAULT_TOPIC;

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, ParserMessage>> listener(RedisConnectionFactory connectionFactory) {
        //初始化topic
        try {
            LettuceConnection clusterConnection = (LettuceConnection) connectionFactory.getConnection();
            XReadArgs.StreamOffset<byte[]> streamOffset = XReadArgs.StreamOffset.from(LettuceConverters.toBytes(DEFAULT_GROUP), "0-0");
            clusterConnection.getNativeConnection().xgroupCreate(streamOffset, LettuceConverters.toBytes(DEFAULT_GROUP), XGroupCreateArgs.Builder.mkstream());
        } catch (Exception ex) {
            log.warn("Already Created {}", ex.getMessage());
        }


        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, ParserMessage>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ZERO)
                .batchSize(1)
                .targetType(ParserMessage.class)
                .executor(threadPoolTaskExecutor)
                .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, ParserMessage>> container = StreamMessageListenerContainer
                .create(connectionFactory, options);


        //指定消费者对象
        container.register(
                StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(DEFAULT_TOPIC, ReadOffset.lastConsumed()))
                        .errorHandler((error) -> {
                            if (!(error instanceof QueryTimeoutException)) {
                                log.error(error.getMessage(), error);
                            }
                        })
                        .cancelOnError(e -> false)
                        .consumer(Consumer.from(DEFAULT_GROUP, DEFAULT_GROUP))
                        //关闭自动ack确认
                        .autoAcknowledge(false)
                        .build()
                , originalConsumer);
        container.start();
        return container;
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐