Redis上的操作:

打开两个窗口,打开redis客户端:
一个客户端:订阅(客户端订阅channel1频道):127.0.0.1:6379> subscribe channel1
另一个客户端:发布(客户端向channel1频道发送消息hello):127.0.0.1:6379> publish channel1 hello

Java代码实现

1: 先创建一个订阅消息处理类

/**
 * 订阅消息处理类
 *
 * @version 1.0
 * @date 2022/05/26 15:51:11
 */
public class Subscriber extends JedisPubSub {

    public Subscriber(){
    }
    //收到消息会调用
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("接收 redis 发布的消息, 频道为:" + channel + ", 消息内容是:" + message);
    }

    //订阅了频道会调用
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("订阅 redis 频道成功, 频道是:" + channel + ", 订阅频道数量是:" + subscribedChannels);
    }

    //取消订阅 会调用
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("取消订阅 redis 频道,频道:" + channel + ", 订阅频道:" + subscribedChannels);
    }
}

2: 创建消息的发布者

/**
 * 消息的发布者
 *
 * @version 1.0
 * @date 2022/05/26 15:43:06
 */
public class SmsPublisher extends Thread {

    private final JedisPool jedisPool;

    public SmsPublisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        //连接池中取出一个连接
        Jedis jedis = jedisPool.getResource();
        while (true) {
            try {
                //向 xyzChannel 的频道上推送消息
                Long smsNumber = jedis.publish("xyzChannel", reader.readLine());
                System.out.println("消息的发布者:发布成功,当前向xyzChannel频道发送的消息数量为:" + smsNumber);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

3: 创建消息订阅者

/**
 * 消息订阅者
 *
 * @version 1.0
 * @date 2022/05/26 15:49:47
 */
public class SmsSubscriber extends Thread{

    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();
    //频道
    private final String channel = "xyzChannel";

    public SmsSubscriber(JedisPool jedisPool) {
        super("SmsSubscriber");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        System.out.println("订阅Redis频道,频道为:" + channel + ", 等待消息发送者发送消息(请在控制台输入消息内容)...");
        Jedis jedis = null;
        try {
            //取出一个连接
            jedis = jedisPool.getResource();
            //通过subscribe的api去订阅频道,入参是订阅者和频道名
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            System.out.println("消息订阅者订阅消息出现异常: " + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

4: Redis消息的发布和订阅测试类

/**
 * Redis消息的发布和订阅测试类
 *
 * @version 1.0
 * @date 2022/05/26 16:01:56
 */
public class SmsTest {
    public static void main(String[] args) {
        // 连接本地redis服务端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
        //消息的发布者
        SmsPublisher publisher = new SmsPublisher(jedisPool);
        publisher.start();

        //消息的订阅者
        SmsSubscriber subscriber = new SmsSubscriber(jedisPool);
        subscriber.start();
    }
}

运行结果

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐