一、单元测试Java多线程

使用junit测试多线程代码,但是等到程序结束,输出结果不完整,或者是完全没结果,因此,可能是其他线程还没结束,主线程就结束了。

原因: junit在运行时,在主线程结束后就关闭了进程,不会等待各个线程运行结束。

==解决方法:==①要是要求不高,可以通过thread.sleep(),让主线程暂时休眠(TimeUnit.MILLISECONDS.sleep(20000);),其他线程运行完在结束;

​ ②比较严谨的做法,可以用 CountDownLatch ,具体使用在代码里有注释(参考:https://blog.csdn.net/goxingman/article/details/105663113);

让主线程休眠,等待其他线程执行完成

public class Learn_Thead {
    /**
     * 使用主线程休眠的方式来解决:Junit单元测试无法测试多线程问题
     * 参考:https://blog.csdn.net/goxingman/article/details/105663113
     * */
    @Test
    public void runThread() throws InterruptedException {
        MyThread m1  = new MyThread("t1");
        MyThread m2 = new MyThread("t2");
        m1.start();
        m2.start();
        TimeUnit.MILLISECONDS.sleep(8000);//等待时间

    }
}
/**
 * 创建新的线程类
 */
class MyThread extends Thread{
    private String name;
    public MyThread(String name){
        this.name = name;
    }
    public void run(){
        for(int i = 0; i < 5; i++){
            System.out.println(name + " 运行, i = " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

二、redis实现消息队列

Redis的列表类型可以很容易实现消息队列,类似于MQ的队列模型,任何时候都可以消费消息,但是每一条消息只能允许消费一次。

命令描述用法
LPUSH(1)将一个或多个值value插入到列表key的表头
(2)如果有多个value值,那么各个value值按从左到右的顺序依次插入表头
(3)key不存在,一个空列表会被创建并执行LPUSH操作
(4)key存在但不是列表类型,返回错误
LPUSH key value [value …]
LPUSHX(1)将值value插入到列表key的表头,当且晋档key存在且为一个列表
(2)key不存在时,LPUSHX命令什么都不做
LPUSHX key value
LPOP(1)移除并返回列表key的头元素LPOP key
LRANGE(1)返回列表key中指定区间内的元素,区间以偏移量start和stop指定
(2)start和stop都以0位底
(3)可使用负数下标,-1表示列表最后一个元素,-2表示列表倒数第二个元素,以此类推
(4)start大于列表最大下标,返回空列表
(5)stop大于列表最大下标,stop=列表最大下标
LRANGE key start stop
LREM(1)根据count的值,移除列表中与value相等的元素
(2)count>0表示从头到尾搜索,移除与value相等的元素,数量为count
(3)count<0表示从从尾到头搜索,移除与value相等的元素,数量为count
(4)count=0表示移除表中所有与value相等的元素
LREM key count value
LSET(1)将列表key下标为index的元素值设为value
(2)index参数超出范围,或对一个空列表进行LSET时,返回错误
LSET key index value
LINDEX(1)返回列表key中,下标为index的元素LINDEX key index
LINSERT(1)将值value插入列表key中,位于pivot前面或者后面
(2)pivot不存在于列表key时,不执行任何操作(3)key不存在,不执行任何操作
LINSERT key BEFORE|AFTER pivot value
LLEN(1)返回列表key的长度(2)key不存在,返回0LLEN key
LTRIM(1)对一个列表进行修剪,让列表只返回指定区间内的元素,不存在指定区间内的都将被移除LTRIM key start stop
RPOP(1)移除并返回列表key的尾元素RPOP key
RPOPLPUSH在一个原子时间内,执行两个动作:(1)将列表source中最后一个元素弹出并返回给客户端
(2)将source弹出的元素插入到列表desination,作为destination列表的头元素
RPOPLPUSH source destination
RPUSH(1)将一个或多个值value插入到列表key的表尾RPUSH key value [value …]
RPUSHX(1)将value插入到列表key的表尾,当且仅当key存在并且是一个列表
(2)key不存在,RPUSHX什么都不做
RPUSHX key value
  •   通过命令 lpush+lpop
  • 队列  命令 lpush+rpop
  • 有限集合  命令 lpush+ltrim
  • 消息队列  命令 lpush+rpop
127.0.0.1:6379> lpush list a b c d
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "d"
2) "c"
3) "b"
4) "a"
127.0.0.1:6379> rpop list
"a"
127.0.0.1:6379> rpop list
"b"

三、java多线程模拟生产者消费者

RedisLearn.java

/**
 * 生产者类
 * 生产者每隔600ms生成一条消息
 * */
class MessageProducer extends Thread{
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;

    public void putMessage(String mess){
        Jedis jedis = new Jedis("IP", 6379);
        jedis.auth("123456");
        Long size = jedis.lpush(MESSAGE_KEY, mess);
        System.out.println("Put " + Thread.currentThread().getName() + " put message " + count);
        count++;
    }
    @Override
    public synchronized  void run() {
        for(int i = 0 ; i < 5; i++){
            putMessage("message" + count);
            try {
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


/**
 * 消费者类
 * 消费者每隔1000ms消费一条消息
 */
class MessageConsumer extends Thread{
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;

    public void consumerMessage(){
        Jedis jedis = new Jedis("IP", 6379);
        jedis.auth("123456");
        String message = jedis.rpop(MESSAGE_KEY);
        System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message);
        count++;
    }
    @Override
    public synchronized  void run() {
        for(int i = 0; i < 9; i++){
            consumerMessage();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class RedisLearn {
    /**
     * 使用单元测试,测试多线程下模拟生产者消费者
     * 使用Redis List : lpush rpop 模拟消息队列
     * @throws InterruptedException
     */
    @Test
    public void runMessageProducter() throws InterruptedException {
        MessageProducer producer = new MessageProducer();
        MessageConsumer comsumer = new MessageConsumer();
        Thread t1 = new Thread(producer, "thread1");
        Thread t2 = new Thread(producer, "thread2");
        Thread t3 = new Thread(producer, "thread3");
        Thread t4 = new Thread(comsumer, "thread4");
        Thread t5 = new Thread(comsumer, "thread5");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        TimeUnit.MILLISECONDS.sleep(20000);
    }
}

result

Put thread1 put message 0
Pop thread5comsumer message = message0
Put thread1 put message 1
Pop thread5comsumer message = message1
Put thread1 put message 2
Pop thread5comsumer message = message2
Put thread1 put message 3
Put thread1 put message 4
Pop thread5comsumer message = message3
Put thread2 put message 5
Put thread2 put message 6
Pop thread5comsumer message = message4
Put thread2 put message 7
Pop thread5comsumer message = message5
Put thread2 put message 8
Put thread2 put message 9
Pop thread5comsumer message = message6
Put thread3 put message 10
Pop thread5comsumer message = message7
Put thread3 put message 11
Put thread3 put message 12
Pop thread5comsumer message = message8
Put thread3 put message 13
Pop thread4comsumer message = message9
Put thread3 put message 14
Pop thread4comsumer message = message10
Pop thread4comsumer message = message11
Pop thread4comsumer message = message12
Pop thread4comsumer message = message13
Pop thread4comsumer message = message14
Pop thread4comsumer message = null
Pop thread4comsumer message = null
Pop thread4comsumer message = null

四、阻塞读

Redis Brpop 命令移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

/**
 * 消费者类
 */
class MessageConsumer extends Thread{
    public static final String MESSAGE_KEY = "message:queue";

    public void consumerMessage(){
        Jedis jedis = new Jedis("nuptpisa.top", 6379);
        jedis.auth("123456");
//        String message = jedis.rpop(MESSAGE_KEY);
//        System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message);
        List<String> message = jedis.brpop(0, MESSAGE_KEY);
        System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message);
    }
    @Override
    public synchronized  void run() {
        for(int i = 0; i < 8; i++){
            consumerMessage();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

result

Put thread1 put message 0
Pop thread4comsumer message = [message:queue, message0]
Put thread1 put message 1
Pop thread4comsumer message = [message:queue, message1]
Put thread1 put message 2
Put thread1 put message 3
Pop thread4comsumer message = [message:queue, message2]
Put thread1 put message 4
Pop thread4comsumer message = [message:queue, message3]
Put thread3 put message 5
Put thread3 put message 6
Pop thread4comsumer message = [message:queue, message4]
Put thread3 put message 7
Pop thread4comsumer message = [message:queue, message5]
Put thread3 put message 8
Put thread3 put message 9
Pop thread4comsumer message = [message:queue, message6]
Put thread2 put message 10
Pop thread4comsumer message = [message:queue, message7]
Put thread2 put message 11
Put thread2 put message 12
Pop thread5comsumer message = [message:queue, message8]
Put thread2 put message 13
Pop thread5comsumer message = [message:queue, message9]
Put thread2 put message 14
Pop thread5comsumer message = [message:queue, message10]
Pop thread5comsumer message = [message:queue, message11]
Pop thread5comsumer message = [message:queue, message12]
Pop thread5comsumer message = [message:queue, message13]
Pop thread5comsumer message = [message:queue, message14]

Process finished with exit code 0
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐