ActiveMQ初解及springboot集成
常用的有ActiveMQ,RabbitMQ,kafka,RocketMQ。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。activemq的queue和topicJMS中定义了两种消息模型:点对点(point to point
常用的有ActiveMQ,RabbitMQ,kafka,RocketMQ。
ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
JMS中定义了两种消息模型:
点对点(point to point, queue)
发布/订阅(publish/subscribe,topic)。
什么是点对点---------可以看成是银行的一对一服务 我们去银行办业务,柜台业务员肯定是跟你一对一的服务的吧,TA不可能一次性对接多个客户吧
也就是queue 不可重复 你去银行存钱寄给你爸妈,你打100块钱,他们之中一个能取到你打的钱吧,不可能两个一起收到每人100块吧
Queue支持存在多个消费者。
package com.zkb;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
private static final String BROKER_URL = "tcp://localhost:61616";
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (true) {
Message message = consumer.receive(20000);
if (message != null) {
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
System.out.println("第 " + i++ + "条消息 : " + text);
}
} else {
break;
}
}
} catch (Exception e) {
}
}
}
package com.zkb;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer2 {
private static final String BROKER_URL = "tcp://localhost:61616";
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (true) {
Message message = consumer.receive(20000);
if (message != null) {
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
System.out.println("第 " + i++ + "条消息 : " + text);
}
} else {
break;
}
}
} catch (Exception e) {
}
}
}
package com.zkb;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final Boolean NON_TRANSACTED = false;
private static final int NUM_MESSAGES_TO_SEND = 100;
private static final long DELAY = 100;
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
TextMessage message = session.createTextMessage("Message #" + i);
System.out.println("message: #" + i);
producer.send(message);
Thread.sleep(DELAY);
}
producer.close();
session.close();
} catch (Exception e) {
System.out.println("Caught exception!");
}
// finally {
// if (connection != null) {
// try {
// connection.close();
// } catch (JMSException e) {
// System.out.println("Could not close an open connection...");
// }
// }
// }
}
}
可以看到 queue 生产者总共生产了 0~99 100条消息
两个各消费了50个 由此可见 但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了
发布/订阅:Topic
package com.zkb.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppProducer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("test" + i);
producer.send(textMessage);
System.out.println("发送消息"+textMessage.getText());
}
connection.close();
}
}
package com.zkb.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppConsumer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("接收消息 = [" + ((TextMessage) message).getText() + "]");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
package com.zkb.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppConsumer2 {
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("接收消息 = [" + ((TextMessage) message).getText() + "]");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
可以看到我这边发了10条消息
两个消费端各10条 被重复消费了
就好像播放电视一样,只要播放了,只要收看了的人都可以看到
好了明白了区别就老规矩,咱们用到实际项目中去应用,我们这里就springboot和它简单集成一下
@PostMapping("/produce")
@ApiOperation(value="queue发送",notes = "queue发送")
@ResponseBody
public Result produce(Mail mail) throws Exception{
Result result = new Result();
producer.sendMail(queue, mail);
return result.success();
}
@PostMapping("/topic")
@ApiOperation(value="topic发送",notes = "topic发送")
@ResponseBody
public Result topic(Mail mail) throws Exception{
Result result = new Result();
producer.sendMail(topic, mail);
return result.success();
}
@JmsListener(destination = "myqueue", containerFactory = "jmsListenerContainerQueue")
public void displayMail(Mail mail) {
System.out.println("listen1从ActiveMQ队列myqueue中取出一条消息:");
System.out.println(mail.toString());
}
@JmsListener(destination = "mytopic", containerFactory="jmsListenerContainerTopic")
public void displayTopic(Mail msg) {
System.out.println("consumer1从ActiveMQ的Topic:mytopic中取出一条消息:");
System.out.println(msg);
}
这边只贴一下关键的代码吧,具体看demo
demo自取https://download.csdn.net/download/qq_14926283/13695423(没打算挣积分,但是csdn积分自己会上涨,真心需要可私聊,也可以自己下载)
更多推荐
所有评论(0)