安装环境CentOS 7 ,JDK8

1.下载ActiveMQ:http://activemq.apache.org/activemq-5155-release.html

 2.将文件上传至虚拟机并解压

 

[root@dwl activemq]# tar -xf apache-activemq-5.15.5-bin.tar.gz 

3.启动activemq

 4.进入管理页面http://192.168.159.2:8161/admin 用户和密码都是admin

 5.编写生产者消费者

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dwl</groupId>
    <artifactId>activemq</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.5</version>
        </dependency>
    </dependencies>

</project>

生产者

package com.dwl;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @program: activemq
 * @description: 生产者
 * @author: daiwenlong
 * @create: 2018-08-26 15:01
 **/
public class Producer {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //的链接地址
    private static final String BROKEN_URL = "tcp://192.168.159.2:61616";
    //原子计数器
    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //session级别
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者,
            MessageProducer messageProducer = null;
            //每个线程使用自己的消息生产者
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
            while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "正在发送消息!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "正在发送消息!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}



 测试生产者

package com.dwl;

/**
 * @program: activemq
 * @description: 测试生产者
 * @author: daiwenlong
 * @create: 2018-08-26 15:13
 **/
public class RunProducer {

    public static void main(String[] args){
        Producer producer = new Producer();
        producer.init();
        RunProducer runTest = new RunProducer();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //使用多个线程同时生产
        //Thread 1
        new Thread(runTest.new ProducerMq(producer)).start();
        //Thread 2
        new Thread(runTest.new ProducerMq(producer)).start();
        //Thread 3
        new Thread(runTest.new ProducerMq(producer)).start();
        //Thread 4
        new Thread(runTest.new ProducerMq(producer)).start();
        //Thread 5
        new Thread(runTest.new ProducerMq(producer)).start();
    }

    class ProducerMq implements Runnable{
        Producer producer;
        public ProducerMq(Producer producer){
            this.producer = producer;
        }
        public void run() {
            while(true){
                try {
                    //发送消息
                    producer.sendMessage("activemq");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


查看生产情况

管理 页面查看

 消费者

package com.dwl;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @program: activemq
 * @description: 消费者
 * @author: daiwenlong
 * @create: 2018-08-26 15:10
 **/
public class Consumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = "tcp://192.168.159.2:61616";

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    System.out.println(Thread.currentThread().getName()+": 正在接收信息----------"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


 测试消费者

package com.dwl;

/**
 * @program: activemq
 * @description: 测试消费者
 * @author: daiwenlong
 * @create: 2018-08-26 15:46
 **/
public class RunConsumer {

    public static void main(String[] args){
        Consumer consumer = new Consumer();
        consumer.init();
        RunConsumer testConsumer = new RunConsumer();
        //启动多个线程同时消费
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
    }

   class ConsumerMq implements Runnable{
       Consumer consumer;
        public ConsumerMq( Consumer consumer){
            this.consumer = consumer;
        }

        public void run() {
            while(true){
                try {
                    consumer.getMessage("activemq");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


查看消费情况

管理页面

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐