1 面向消息中间件 和 ActiveMQ 简介

1.1 什么是面向消息中间件

RMI、SOA和微服务等架构,为JavaEE系统的分布式提供了可能,软件理论上可以不被 物理硬件限制而无限扩展。但这些的远程调用是同步操作的,不可避免存在一些局限:

(1)同步阻塞:客户对象发出调用后,必须等待服务对象完成处理并返回结果才能继续 执行;

(2)紧密耦合:客户进程和服务对象进行都必须正常运行,服务对象的崩溃会导致客户 对象的异常;

(3)点对点:客户对象一次只能发送给一个目标对象。 面向消息的中间件(Message Oriented Middleware,MOM)使用异步手段有效的解决 以上问题:“消息发送者”将消息发送给“消息服务器”,“消息服务器”将消息存放在若干“队 列”中,在合适的时候再将消息转发给接收者。

(1)这种模式下,发送和接收是异步的,发送者无需等待;

(2)二者松耦合:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定 运行:

(3)一对多:对于一个消息可以有多个接收者。

1.2 Java消息服务——JMS

JavaEE中定义的“Java消息服务”(JMS)定义了Java中访问消息中间件的接口。JMS只 是一套接口,并没有给予实现,各大厂商和开源组织都对JMS实现不同产品,这些产品 包括:Apache的ActiveMQ、阿里的RocketMQ、IBM的MQSeries、Microsoft的MSMQ和 Spring Source的RabbitMQ等等,它们基本都遵循JMS规范。 这里介绍的ActiveMQ是最早的JMS开源产品,在Java世界使用比较广泛,在中等规模的 应用中是完全胜任的。当然,如果要真正面面对大型互联网应,要解决超高并发和吞吐 量问题,现在更推荐使用RabbitMQ、Kafuka或者RocketMQ等新一代的分布式产品,但 它们的基本原理和用法是相通的。

1.3 JMS规范、术语和常见接口

(1)Provider(MessageProvider)/ Producer:生产者(消息的发送方)

(2)Consumer(MessageConsumer):消费者(消息的接收方)

(3)PTP:Point To Point,即点对点的消息模型(一对一发布)

(4)Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型(一对多发布)

(5)Queue:队列

(6)Topic:主题

(7)ConnectionFactory:连接工厂。JMS用它创建连接

(8)Connection:JMS Consumer 到 JMS Provider的连接

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂以后,就可以创 建一个与jms提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接 收队列和主题到目标。

(9)Destination:消息的目的地

目标是一个包装了消息目标标识符的【被管对象】,消息目标是指消息发布和接收的地 点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过jndi发现它们。 和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的queue,以及发布者/ 订阅者模型的Topic

(10)Session:会话,内部维护一个发送或者接收消息的线程

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连 续的,也就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如 果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消 息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消 息生产者来发送消息,创建消息消费者来接收消息。

(11)Message:消息

是在消费者和生产者之间传送的对象,也就是说从一个应用程序送到另一个应用程序。 一个消息有三个主要部分: 消息头(必须):包含用于识别和消息寻找路由的操作设置。 一组消息属性(可选):包括额外的属性,支持其他提供者和用户的兼容。可以创建定制 的字段和过滤器(消息选择器) 一个消息体(可选):允许用户创建五种类型的消息(文本消息TextMessage 、映射 消息MapMessage 、字节消息ByteMessage、流消息StreamMessage、对象消息 ObjectMessage)

2 ActiveMQ的安装

ActiveMQ是一个用Java编写的程序,可以在官网中下载zip压缩包,只要配置好JDK,解 压即用。

这里是官方的下载地址:http://activemq.apache.org/components/classic/download

(1)运行:解压后,进入bin目录,执行对应版本的 activemq.bat

 (2)管理页面:ActiveMQ的默认端口是8161,通过http://localhost:8161/admin/ 可以进 入管理页面

 管理员默认账号为:admin,admin。账号是通过 conf/jetty-realm.properties 文件来设置 的。

 (3)把ActiveMQ注册成Window服务

以管理员身份打开cmd:

 进入“~\apache-activemq-5.15.9\bin\win64”目录,执行“InstallService.bat”:

 (4)为ActiveMQ添加使用者账号 ActiveMQ默认使用是不需要账号和密码的,在实际使用中当然不合适,我们可以修改 ~\conf\activemq.xml 文件,添加简单的验证账号。

修改配置文件,在元素中添加验证插件:(如需直接获得代码可往底部链接)

 然后再把上述配置中的username和password,配置在 ~\conf\credentials.properties 文件 中。

 密码设置问题可以参考:https://blog.csdn.net/dandan2zhuzhu/article/details/78461872

3 Java中使用ActiveMQ

3.1 消息生产者程序

(1)创建maven的jar项目并导入activemq依赖

 (2)实现消息生产者示例,并执行。‘ 需要注意ActiveMQ中服务 url 的区别:管理路径为 http://localhost:8161 ;生产路径为 tcp://localhost:61616

 

 (3)登录 http://localhost:8161/admin ,进入Queues页面,可查看消息队列中保存的消 息

2.2 消息消费者程序

(1)创建maven的jar项目,导入上述的activemq依赖

(2)创建消息消费者示例,并执行

 

 (3)消息消费完结后,再次查看消息队列已经清空(都被消费掉了)。

 4 消息发送详解

4.1 Session事务

创建 Session 时,把 transacted 参数设置为 true,可以使用为会话事务管理所发送的消 息。

Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

这时该Session发送的消息不会马上保存到服务器上,如果执行 “sesssion.commit()” 则所 有消息会以原子性的方式提交到服务器,如果执行“session.rollback()” 则发送的消息会被 回滚。下面代码显示“消息生产者”使用事务发送消息。

 4.2 Session与签收模式

创建 Session 时还可以选择消息消费者的“签收模式”——acknowledgeMode。

 消息消费者在获取到(Push或Pull)消息后,需要向消息中间(Activemq服务器)件发 送一个签收信息“Ack”,以表示消息已收到,如果消费者没有签收,消息中间件是不会把 消息删除的,它还会在服务器等待获取。

创建 Session 时的签收模式参数(acknowledgeMode),用于指定消费者的签收方式。

“签收模式”是一个整型常量,可选:AUTO_ACKNOWLEDGE(自动签收)、 CLIENT_ACKNOWLEDGE(手工签收),如果把签收模式设置为 CLIENT_ACKNOWLEDGE,消费者必须调用消息对象的acknowledge(),消息中间件才 会认为该消息已经被消费,可以清除了。

修改消费者代码如下:

这时,虽然消费者已经读取了activemq中的消息了,但activemq中的消息还保留在服务 器等待获取。

4.3 MessageProducer的发送模式、优先级和过期时间 

MessageProducer 由 session创建,用于向指定的消息队列(Destination)发送消息, 消息发送通过send()方法实现。send()方法有几个重载,其中参数最完整的如下:

前两个参数代表指定的消息队列和消息体,而deliveryMode、piority和timeToLive 是可选 参数,用于控制消息的属性。

(1)deliveryMode ——发送模式

ActiveMq支持两种消息的发送模式:PERSISTENT(持久化)和 NON_PERSISTENT(非持久化);若不指定传送模式,默认为持久消息;如果可以容 忍消息丢失,使用非持久化模式可以改善性能和减小存储开销。

(2)priority——优先级

消息优先级有从0~9十个级别,0-4是普通消息,5-9是加急消息,如果不指定优先级,则 默认为4,JMS不要求严格按照这10个优先级发送消息,但必须保证加急消息要优先于普 通消息到达。

(3)timeToLive——消息过期时间

默认消息永不过期,但是可以设置过期时间,单位是毫秒。 以下示例使用“持久化”、“优先级”和“超时”来发送消息:

 需要注意的是,消费者读取带有“优先级”的队列的时候,默认并不严格根据优先级大小来 消费,需要严格根据优先级来消费的话,需要在配置中指定消息队列开启优先级规则。 下面修改了 activemq.xml 配置文件,开启了“text-queue-1”队列的优先级规则。

 这时,消费者才会根据优先级来读取消息。值得注意的是:在实际的高并发请求下,消 息的优先级是很难严格的保证了。

5 使用 Spring Boot 简化JMS开发 5.1 发送字符串消息

(1)创建 spring boot 项目,引入 spring-boot-starter-activemq 

(2)在 application.yml 中配置 activemq 连接

(3)在Spring配置类中创建Destination(消息目的地)——Queue(队列)

 (4)使用“JmsMessagingTemplate”实现消息生产者

spring提供了JmsMessagingTemplate来简化JMS的调用,直接可以向指定队列发送消 息。

为了方便测试,这里使用了REST控制器直接调用消息队列。

 

(4)使用“@JmsListener”实现消息消费者

spring 提供了“@JmsListener”注解,用于指定接收特定队列消息的消费者方法。

5.2 发送对象消息

使用JmsMessagingTemplate还可从生产者向消费者以发送对象,对象实际上会被序列化 到消息队列中。

作为消息发送的对象需要: 

(1)设置为可序列化

(2)修改application.yml,配置需要传输的类为信任对象

 (3)定义消息队列

(4)消息生产者

 (5)消息消费者 

 

6 使用ActiveMQ实现抢购时的并发效率优化 

抢购超发时可以使用过Redis来判断超发问题,使用Redis取代SQL数据库可 以有效提高并发操作的效率。

但实际使用中,我们最终还是要把重要的业务数据保存到SQL数据库中,因此Redis避免 超发后程序依然要读写缓慢的SQL数据库,因此无法真正提高并发的响应效率(请求依 然要等待SQL数据写入后才能返回)。

为了解决并发效率,这里可以使用JMS把购买请求和SQL写入分离,购买请求处理只需 把要保存到SQL的购买信息推送到消息队列中,然后由另一端的购买信息消费者程序负 责写入SQL,购买请求就可以快速返回并响应用户,而消费者程序可以慢慢的再把数据 保存到SQL数据库中。

下面示例,演示这一改进:

(1)修改 pom 导入mq依赖:

(2)修改application.yml 配置ActiveMQ

 

(3)修改业务对象“PurchaseServiceImpl”,配置 Queue,在购买请求处理的业务对象 中使用JMS

 (4)创建消息消费者业务对象,把购买记录保存到SQL

资源下载:

ActiveMQ使用入门.pdf-Java文档类资源-CSDN下载

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐