ActiveMQ安装与使用示例
安装环境CentOS 7 ,JDK81.下载ActiveMQ:http://activemq.apache.org/activemq-5155-release.html 2.将文件上传至虚拟机并解压 [root@dwl activemq]# tar -xf apache-activemq-5.15.5-bin.tar.gz3.启动activemq 4.进入...
·
安装环境CentOS 7 ,JDK8
1.下载ActiveMQ:http://activemq.apache.org/activemq-5155-release.html
2.将文件上传至虚拟机并解压
[root@dwl activemq]# tar -xf apache-activemq-5.15.5-bin.tar.gz
3.启动activemq
4.进入管理页面http://192.168.159.2:8161/admin 用户和密码都是admin
5.编写生产者消费者
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dwl</groupId>
<artifactId>activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.5</version>
</dependency>
</dependencies>
</project>
生产者
package com.dwl;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @program: activemq
* @description: 生产者
* @author: daiwenlong
* @create: 2018-08-26 15:01
**/
public class Producer {
//ActiveMq 的默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默认登录密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//的链接地址
private static final String BROKEN_URL = "tcp://192.168.159.2:61616";
//原子计数器
AtomicInteger count = new AtomicInteger(0);
//链接工厂
ConnectionFactory connectionFactory;
//链接对象
Connection connection;
//事务管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal();
public void init(){
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//从工厂中创建一个链接
connection = connectionFactory.createConnection();
//开启链接
connection.start();
//session级别
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname){
try {
//创建一个消息队列
Queue queue = session.createQueue(disname);
//消息生产者,
MessageProducer messageProducer = null;
//每个线程使用自己的消息生产者
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//创建一条消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"正在发送消息!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"正在发送消息!,count:"+num);
//发送消息
messageProducer.send(msg);
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试生产者
package com.dwl;
/**
* @program: activemq
* @description: 测试生产者
* @author: daiwenlong
* @create: 2018-08-26 15:13
**/
public class RunProducer {
public static void main(String[] args){
Producer producer = new Producer();
producer.init();
RunProducer runTest = new RunProducer();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//使用多个线程同时生产
//Thread 1
new Thread(runTest.new ProducerMq(producer)).start();
//Thread 2
new Thread(runTest.new ProducerMq(producer)).start();
//Thread 3
new Thread(runTest.new ProducerMq(producer)).start();
//Thread 4
new Thread(runTest.new ProducerMq(producer)).start();
//Thread 5
new Thread(runTest.new ProducerMq(producer)).start();
}
class ProducerMq implements Runnable{
Producer producer;
public ProducerMq(Producer producer){
this.producer = producer;
}
public void run() {
while(true){
try {
//发送消息
producer.sendMessage("activemq");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
查看生产情况
管理 页面查看
消费者
package com.dwl;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @program: activemq
* @description: 消费者
* @author: daiwenlong
* @create: 2018-08-26 15:10
**/
public class Consumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEN_URL = "tcp://192.168.159.2:61616";
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
System.out.println(Thread.currentThread().getName()+": 正在接收信息----------"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试消费者
package com.dwl;
/**
* @program: activemq
* @description: 测试消费者
* @author: daiwenlong
* @create: 2018-08-26 15:46
**/
public class RunConsumer {
public static void main(String[] args){
Consumer consumer = new Consumer();
consumer.init();
RunConsumer testConsumer = new RunConsumer();
//启动多个线程同时消费
new Thread(testConsumer.new ConsumerMq(consumer)).start();
new Thread(testConsumer.new ConsumerMq(consumer)).start();
new Thread(testConsumer.new ConsumerMq(consumer)).start();
new Thread(testConsumer.new ConsumerMq(consumer)).start();
}
class ConsumerMq implements Runnable{
Consumer consumer;
public ConsumerMq( Consumer consumer){
this.consumer = consumer;
}
public void run() {
while(true){
try {
consumer.getMessage("activemq");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
查看消费情况
管理页面
更多推荐
已为社区贡献2条内容
所有评论(0)