基于RabbitMQ原理的自定义消息队列实现
本项目实现了一个消息队列系统,支持跨域/跨主机消息转发与通信,管理虚拟机、交换机、队列、绑定和消息,实现Direct、Fanout、Topic三种交换机转发方式。系统具有数据持久化、异常恢复、垃圾回收优化存储等功能,基于TCP自定义应用层协议进行网络通信,统一管理硬盘和内存操作,设计虚拟主机功能,实时消费消息,确保线程安全,提供手动和自动客户端应答模式,并通过单元测试保证系统稳定性。
文章目录
1. 什么是消息队列
与消息队列类似的是阻塞队列(Blocking Queue),它是一个生产者消费者模型 (是在一个进程内部进行的)。
而所谓的消息队列(Message Queue),就是把阻塞队列这样的数据结构,单独提取成了一个程序,进行独立部署 -> 生产者消费模型 (进程和进程之间 / 服务和服务之间);一般是部署在分布式系统上的,就是说整个服务器程序,不是一个单一的程序了,而是由一组服务器构成的集群。
生产者消费者模型作用:
- 解耦合
- 本来有个分布式系统,A 服务器 调用 B 服务器(A 给 B 发请求,B 给 A 返回响应),此时 A 和 B 的耦合是比较大的。
- 引入消息队列后,A 把请求发送到消息队列,B 再从消息队列获取到请求(像写代码一样,追求的是高内聚,低耦合)。
- 削峰填谷
- 比如 A 是入口服务器,A 调用 B 完成一些具体业务,如果是 A 和 B 直接通信,如果突然 A 收到一组用户的请求的峰值,此时 B 也会随着受到峰值(每个物理上的服务器,硬件资源都是有上限的,包括但不限于 CPU,内存,硬盘,网络带宽)。
- 引入消息队列后,A 把请求发送到消息队列,B 再从消息队列获取到请求;此时虽然 A 收到很多请求,队列也收到了很多请求,但是 B 仍旧可以按照原来的节奏处理请求,不至于说一下就收到太多的并发量。
- 举个例子:高铁火车站,进站口。 乘客好 比A ,进站口好比 B,是有限的,就需要一个队列来排队,这样不管人多少,就不会影响到乘客进站以后的坐车。
市面上一些知名的消息队列
- RabbitMQ
- Kafka
- RocketMQ
- ActiveMQ
2. 需求分析
2.1. 核心概念1
- 生产者(Producer)
- 消费者(Consumer)
- 中间人(Broker)
- 发布(Push) :生产者向中间人这里投递消息的过程
- 订阅(Subscribe):哪些消费者要从中间人取数据,这个注册的过程,称为 “订阅”
- 消费 (Consume) :消费者从中间人这里取数据的动作
一个生产者,一个消费者
N 个生产者,N 个消费者
2.2. 核心概念2
Broker server 内部也涉及一些关键概念(是为了如何进出队列)
- 虚拟主机(Virtual Host),类似于 MySQL 中的 database,算是一个 “逻辑” 上的数据集合。
- 一个 Broker server 上可以组织多种不同类别数据,可以使用 Virtual Host 做出逻辑上的区分。
- 实际开发中,一个 Broker server 也可能同时用来管理多个 业务线上的数据,就可以使用 Virtual Host 做出逻辑上的区分。
- 交换机(Exchange)
- 生产者把消息投递给 Broker Server,实际上是把消息先交给了 (公司某一层楼)Broker Server 上的交换机,再由交换机把消息交给对应的队列。 (交换机类似于公司的“前台小姐姐”)。
- 队列(Queue)
- 真正用来存储处理消息的实体,后续消费者也是从对应的队列中取数据。
- 一个大的消息队列中,可以有很多具体的小队列。
- 绑定(Binding)
- 把交换机和队列之间,建立关系。
- 可以把 交换机 和 队列 视为,数据库中 多对多的关系。可以想象,在 MQ 中,也是有一个这样的中间表,所谓的 “绑定’其实就是中间表中的一项
- 消息(Message)
- 具体来说,是 服务器 A 发给 B 的请求(通过MQ转发), 服务器 B 给 服务器 A 返回的响应(通过MQ转发)。
- 一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。
RabbitMQ 就是按照这些概念来组织的(AMQP协议),而项目的实现也是参照 RabbitMq 的。
2.3. 核心API
消息队列服务器(Broker Server),要提供的核心 API 有下面几个
- 创建队列(queueDeclare)
- 此处不用 Create 这样的术语,原因是 Create 仅仅是创建;而 Declare 起到的效果是,不存在则创建,存在就啥也不做
-
销毁队列(queueDelete)
-
创建交换机(exchangeDeclare)
-
销毁交换机(exchageDelete)
-
创建绑定(queueBind)
-
解除绑定(queueUnbind)
-
发布消息(basicPublish)
-
订阅消息(basicConsume)
-
确认消息(basicAck)
- 这个 API 起到的效果,是可以让消费者显式的告诉 broker server,这个消息我处理完毕了,提高整个系统的可靠性,保证消息处理没有遗漏
RabbitMQ 提供了 肯定(已读已回) 和 否定的 (已读未回)确认,此处我们项目就只有 肯定确认。
在这里的项目中,并没有搞一个 API 叫 “消费消息”,即让消费者通过这个 API 从服务器上取走消息。
对于 MQ 和 消费者之间的工作模式有两种,一是 Push(推),就是 Broker 把收到的数据,主动的发送给订阅的消费者,RabbitMQ 只支持 推 的方式;二是 Pull (拉),就是消费者主动调用 Broker 的 API 取数据,Kafka 就能支持 拉。
2.4. 交换机类型
交换机在转发消息的时候,是有一套转发规则的
提供了几种不同的 交换机类型 (ExchangType)来描述这里不同的转发规则
RabbitMQ 主要实现了四种交换机类型(也是由 AMQP 协议定义的)
- Direct 直接交换机
- Fanout 扇出交换机
- Topic 主题交换机
- Header 消息头交换机
项目中实现了前三种
- Direct 直接交换机
a. 生产者发送消息时,会指定一个“目标队列”的名字(此时的 routingKey 就是 队列的名字)
b. 交换机收到后,就看看绑定的队列里面,有没有匹配的队列
c. 如果有,就转发过去(把消息塞进对应的队列中)
d. 如果没有,消息直接丢弃
- Fanout 扇出交换机
a. 会把消息放到交换机绑定的每个队列
b. 只要和这个交换机绑定任何队列都会转发消息
- Topic 主题交换机,有两个关键概念
a. bindingKey:把队列和交换机绑定的时候,指定一个单词(像是一个暗号一样)
b. routingKey:生产者发送消息的时候,也指定一个单词
如果当前 bindingKey 和 routingKey 对上了,就可以把消息转发到对应的队列
- 上述三种交换机类型,就像 QQ 群发红包
- 专属红包,直接交换机
- 发个10块钱红包,大家都能领 10 块钱红包,扇出交换机
- 我发个口令红包,只有输入对应口令才能领到红包,主题交换机
package com.example.mq.mqserver.core;
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
private ExchangeType(int type) {
this.type = type;
}
public int getType() {
return type;
}
}
2.5. 持久化
上述 虚拟机、交换机、队列、绑定、消息,需要存储起来。此时内存和硬盘各存储一份,内存为主,硬盘为辅。
- 交换机、队列、绑定:这几个部分可能会进行频繁的增删改查,使用数据库管理存储
- 消息:存储在文件中,一是因为消息是不需要进行复杂的增删改查,二是数据库性能也有限
在内存中存储的原因:
对于 MQ 来说,能够高效的转发处理数据,是非常关键的指标! 因此对于使用内存来组织数据,得到的效率,就比放硬盘要高很多。
在硬盘中存储原因:
为了防止内存中数据随着进程重启 / 主机重启而丢失。
2.6. 网络通信
其他的服务器(生产者 / 消费者)通过网络,是要和 Broker Server 进行交互的。
此处设定,使用 TCP + 自定义的应用层协议 实现 生产者 / 消费者 和 BrokerServer 之间的交互工作
应用层协议主要工作:就是让客户端可以通过网络,调用 brokerserver 提供的编程接口
因此,客户端这边也要提供上述 API,只有服务器是真正干实事的;客户端只是发送 / 接受响应
客户端的本地方法调用,实际上就好像调用了一个远端服务器的方法一样 (远程过程调用 RPC,可以视为是编写客户端服务器程序,通信过程的一种设计思想)。
客户端除了提供上述 9 个和服务器方法对应的方法之外,还需要提供 4 个 额外的方法,支撑其他工作
- 创建 Connection
- 关闭 Connection
- 此处用的 TCP 连接,一个 Connection 对象,就代表一个 TCP连接
- 创建 Channel
- 一个 Connection 里面包含多个 Channel,每个 Channel 上传输的数据都是互不相干的
- TCP 中,建立 / 断开一个连接,成本挺高的,因此很多时候不希望频繁建立断开 TCP 连接
- 所以定义一个 Channel ,不用的时候,销毁 Channel,此处 Channel 是逻辑概念,比 TCP 轻量很多
- 关闭 Channel
🍂消息应答模式
- 自动应答,消费者把这个消息取走了,就算应答了
- 手动应答,basicAck 方法属于手动应答(消费者需要主动调用这个 API 来进行应答)
2.7. 总结
需要做哪些工作?
- 需要实现 生产者,消费者,brokerserver 三个部分
- 针对生产者和消费者来说,主要编写的是 客户端和服务器的通信部分,给客户端提供一组 api,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法
- 比如创建交换机,客户端这边只需要提供相关参数即可,然后通过 socket 将 request 传入到网卡中,然后服务器从 网卡中读取 request 解析。然后计算请求得到 response,再通过 socket 写回去网卡。
- 实现 brokerserver 【重点】
- 持久化
- 上述的这些关键数据,在硬盘中怎么存储,用啥格式存储,存储在数据库还是文件?
- 后续服务器重启了,如何读取这些数据,把内存中内容恢复过来?
上述要做的这些工作,最终的目标,就是实现一个“分布式系统下”这样的生产者消费者模型,但是在
学习环境的 broker server 并不支持分布式部署(集群功能),只是一个单机的 broker server 能够给
多个生产者消费者提供服务,但专业的 mq,RabbitMQ、kafka 这些都是支持集群的(提高可用性、处理更高的并发、数据相互备份)。
🍂模块划分
3. 创建核心类
3.1. Exchange
package com.example.mq.mqserver.core;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.HashMap;
import java.util.Map;
/**
* 表示一个交换机
*/
public class Exchange {
// 交换机的身份标识. (唯一)
private String name;
// 交换机类型, DIRECT, FANOUT, TOPIC
private ExchangeType type = ExchangeType.DIRECT;
// 表示该交换机是否要持久化存储, 为 true 表示需要持久化, 为 false 表示不必持久化.
private boolean durable = false;
// 如果当前交换机, 没人使用了(生产者), 就会自动被删除.
// 暂时未实现自动删除功能.
private boolean autoDelete = false;
// 表示的是创建交换机时指定的一些额外的参数选项. 暂时未实现对应的功能
// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.
private Map<String, Object> arguments = new HashMap<>();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ExchangeType getType() {
return type;
}
public void setType(ExchangeType type) {
this.type = type;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
// 这组自定义 get set 用于和数据库交互使用(Map <=>字符串).
public String getArguments() {
// 把当前的 arguments 参数, 从 Map 转成 String (JSON)
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// 如果代码真异常了, 返回一个空的 json 字符串
return "{}";
}
// 是从数据库读数据之后, 构造 Exchange 对象, 使用该方法
public void setArguments(String argumentsJson) {
// 把参数中的 argumentsJson 按照 JSON 格式解析, 转成 Map 对象
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
// 下面这组针对 arguments 提供 getter setter , 用来去更方便的获取/设置这里的键值对.
// 在 java 代码内部使用 (测试)
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key, Object value) {
arguments.put(key, value);
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
3.2. MSGQueue
package com.example.mq.mqserver.core;
import com.example.mq.common.ConsumerEnv;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 表示一个存储消息的队列
*/
public class MSGQueue {
// 表示队列的身份标识.
private String name;
// 表示队列是否持久化, true 表示持久化保存, false 表示不持久化.
private boolean durable = false;
// 这个属性为 true, 表示这个队列只能被一个消费者使用. 如果为 false 则是大家都能使用
// 这个 独占 功能, 暂时未实现.
private boolean exclusive = false;
// 为 true 表示没有人(消费者)使用之后, 就自动删除. false 则是不会自动删除.
// 这个 自动删除 功能, , 暂时未实现.
private boolean autoDelete = false;
// 表示扩展参数. 暂时未实现
private Map<String, Object> arguments = new HashMap<>();
// 订阅当前队列的消费者
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
// 记录当前取到了第几个消费者. 方便实现轮询策略.
private AtomicInteger consumerSeq = new AtomicInteger(0);
// 添加一个新的订阅者
public void addConsumerEnv(ConsumerEnv consumerEnv) {
consumerEnvList.add(consumerEnv);
}
// 订阅者的删除暂时先不考虑.
// 选出一个订阅者, 用来处理当前的消息. (按照轮询的方式)
public ConsumerEnv chooseConsumer() {
if (consumerEnvList.size() == 0) {
// 该队列没有人订阅的
return null;
}
// 计算当前要取的元素的下标.
int index = consumerSeq.get() % consumerEnvList.size();
consumerSeq.getAndIncrement();
return consumerEnvList.get(index);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isExclusive() {
return exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
public String getArguments() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return "{}";
}
public void setArguments(String argumentsJson) {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key, Object value) {
arguments.put(key, value);
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
3.3. Binding
package com.example.mq.mqserver.core;
/**
* 表示队列和交换机之间的存储关系
*/
public class Binding {
private String exchangeName;
private String queueName;
// 主题交换机, 和发的消息 "对暗号"
private String bindingKey;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBindingKey() {
return bindingKey;
}
public void setBindingKey(String bindingKey) {
this.bindingKey = bindingKey;
}
}
3.4. Message
package com.example.mq.mqserver.core;
import java.io.Serializable;
import java.util.Arrays;
import java.util.UUID;
/**
* 表示一个要传递的消息
* Message 对象, 需要能够在网络上传输, 能写入到文件中.
* 需要针对 Message 进行序列化和反序列化.
* 这里使用 标准库 自带的 序列化/反序列化 操作, 不能使用json(文本格式, 放的是文本类型的数据)
* 而这里的 Message 存储的是二进制的数据
*/
public class Message implements Serializable {
// 属性部分
private BasicProperties basicProperties = new BasicProperties();
// 正文部分, 支持二进制数据
private byte[] body;
// 上面两个属性是 Messages 最核心的部分
// 下面的两个属性是辅助用的属性
// Message 后续会存储到文件中(如果持久化的话).
// 一个文件中会存储很多的消息. 如何找到某个消息
// 使用下列的两个偏移量来表示消息在文件中的具体位置呢. [offsetBeg, offsetEnd)
// 这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.
// 不需要进行序列化传输, 在服务器端存储到文件后 Message 的位置已经固定了
private transient long offsetBeg = 0; // 消息数据的开头距离文件开头的位置偏移(字节)
private transient long offsetEnd = 0; // 消息数据的结尾距离文件开头的位置偏移(字节)
// 使用这个属性表示该消息在文件中是否是有效消息. (逻辑删除的效果)
// 0x1 表示有效. 0x0 表示无效.
private byte isValid = 0x1;
// 创建一个工厂方法, 封装一下创建 Message 对象的过程.
// 创建的 Message 对象, 会自动生成唯一的 MessageId
// 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {
Message message = new Message();
if (basicProperties != null) {
message.setBasicProperties(basicProperties);
}
// 此处生成的 MessageId 以 M- 作为前缀.
message.setMessageId("M-" + UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;
// 这里的创建只是把 body 和 basicProperties (核心内容)先设置出来.
// offsetBeg, offsetEnd, isValid, 是消息持久化的时候才会用到. 在把消息写入文件之前才会进行设定.
// 这里只是在内存中创建一个 Message 对象.
return message;
}
// 获得消息的 id
public String getMessageId() {
return basicProperties.getMessageId();
}
// 设置消息 id
public void setMessageId(String messageId) {
basicProperties.setMessageId(messageId);
}
// 获得 "暗号"
public String getRoutingKey() {
return basicProperties.getRoutingKey();
}
// 设置 "暗号"
public void setRoutingKey(String routingKey) {
basicProperties.setRoutingKey(routingKey);
}
// 获得持久化状态
public int getDeliverMode() {
return basicProperties.getDeliverMode();
}
// 设置是否持久化
public void setDeliverMode(int mode) {
basicProperties.setDeliverMode(mode);
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public long getOffsetBeg() {
return offsetBeg;
}
public void setOffsetBeg(long offsetBeg) {
this.offsetBeg = offsetBeg;
}
public long getOffsetEnd() {
return offsetEnd;
}
public void setOffsetEnd(long offsetEnd) {
this.offsetEnd = offsetEnd;
}
public byte getIsValid() {
return isValid;
}
public void setIsValid(byte isValid) {
this.isValid = isValid;
}
@Override
public String toString() {
return "Message{" +
"basicProperties=" + basicProperties +
", body=" + Arrays.toString(body) +
", offsetBeg=" + offsetBeg +
", offsetEnd=" + offsetEnd +
", isValid=" + isValid +
'}';
}
}
package com.example.mq.mqserver.core;
import java.io.Serializable;
public class BasicProperties implements Serializable {
// 消息的唯一身份标识. 为了保证 id 的唯一性, 使用 UUID 来作为 message id
private String messageId;
// 该属性是一个消息上带有的内容, 和 bindingKey 做匹配.
// 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名.
// 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用).
// 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. "对暗号" 符合要求的才能转发给对应队列.
private String routingKey;
// 这个属性表示消息是否要持久化. 1 表示不持久化, 2 表示持久化.
private int deliverMode = 1;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public int getDeliverMode() {
return deliverMode;
}
public void setDeliverMode(int deliverMode) {
this.deliverMode = deliverMode;
}
@Override
public String toString() {
return "BasicProperties{" +
"messageId='" + messageId + '\'' +
", routingKey='" + routingKey + '\'' +
", deliverMode=" + deliverMode +
'}';
}
}
3.5. 数据库操作
3.5.1. 建表操作
此处考虑使用更轻量的数据库 SQLite,因为一个完整的 SQLite 数据库,只有一个单独的可执行文件(不到 1 M),它是一个本地的数据库,相当于是直接操作本地的硬盘文件。
- 直接在 pom.xml 文件中引入
<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.41.0.1</version>
</dependency>
- 然后在 application.yml 配置文件中进行配置
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC
上述依赖和配置都弄完后,当程序启动时,会自动建立数据库。所以我们只需要建表就行。
此处我们根据之前的需求分析,建立三张表,此处我们通过 代码形式来建造三张表
- 配置 application.yml
mybatis:
mapper-locations: classpath:mapper/**Mapper.xml
- 创建一个对应的 interface
- 创建 mapper目录和文件 MetaMapper.xml
3.5.2. 交换机操作
- 在接口先写方法
void insertExchange(Exchange exchange);
List<Exchange> selectAllExchanges();
void deleteExchange(String exchangeName);
- 在 xml 中写
<insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">
insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});
</insert>
<select id="selectAllExchanges" resultType="com.example.mq.mqserver.core.Exchange">
select * from exchange;
</select>
<delete id="deleteExchange" parameterType="java.lang.String">
delete from exchange where name = #{exchangeName};
</delete>
3.5.3. 队列操作
- 在接口先写方法
void insertQueue(MSGQueue queue);
List<MSGQueue> selectAllQueues();
void deleteQueue(String queueName);
- 在 xml 中写
<insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">
insert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});
</insert>
<select id="selectAllQueues" resultType="com.example.mq.mqserver.core.MSGQueue">
select * from queue;
</select>
<delete id="deleteQueue" parameterType="java.lang.String">
delete from queue where name = #{queueName};
</delete>
3.5.4. 绑定操作
- 在接口先写方法
void insertBinding(Binding binding);
List<Binding> selectAllBindings();
void deleteBinding(Binding binding);
- 在 xml 中写
<insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">
insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});
</insert>
<select id="selectAllBindings" resultType="com.example.mq.mqserver.core.Binding">
select * from binding;
</select>
<delete id="deleteBinding" parameterType="com.example.mq.mqserver.core.Binding">
delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
</delete>
4. 一个统一的类进行数据库操作
在服务器(BrokerServer)启动的时候,能够做出以下逻辑判定:
- 如果数据库存在,表也都有了,不做任何操作
- 如果数据库不存在,则创建库,创建表,构造默认数据
构造一个类 DataBaseManager
package com.example.mq.mqserver.datacenter;
import com.example.mq.MqApplication;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.mapper.MetaMapper;
import java.io.File;
import java.util.List;
/**
* 整合数据库操作
*/
public class DataBaseManager {
private MetaMapper metaMapper;
// 针对数据库进行初始化
public void init() {
// 手动的获取到 MetaMapper
metaMapper = MqApplication.context.getBean(MetaMapper.class);
if (!checkDBExists()) {
// 数据库不存在, 就进行建建库表操作
// 先创建一个 data 目录
File dataDir = new File("./data");
dataDir.mkdirs();
// 创建数据表
createTable();
// 插入默认数据
createDefaultData();
System.out.println("[DataBaseManager] 数据库初始化完成!");
} else {
// 数据库已经存在了, 啥都不必做即可
System.out.println("[DataBaseManager] 数据库已经存在!");
}
}
private boolean checkDBExists() {
File file = new File("./data/meta.db");
if (file.exists()) {
return true;
}
return false;
}
// 建表操作 (不需要手动创建数据库, 即 meta.db 文件)
// 在首次执行数据库操作时, MyBatis 会自动创建数据库(meta.db)
private void createTable() {
metaMapper.createExchangeTable();
metaMapper.createQueueTable();
metaMapper.createBindingTable();
System.out.println("[DataBaseManager] 创建表完成!");
}
// 给数据库表中, 添加默认的数据.
// 先添加一个默认的交换机.
// RabbitMQ 中的设定: 默认带有一个 匿名 的交换机, 类型是 DIRECT(直接交换机).
private void createDefaultData() {
// 构造一个默认的交换机.
Exchange exchange = new Exchange();
exchange.setName(""); // 匿名
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
metaMapper.insertExchange(exchange);
System.out.println("[DataBaseManager] 创建初始数据完成!");
}
// 删除数据库文件
public void deleteDB() {
File file = new File("./data/meta.db");
boolean ret = file.delete();
if (ret) {
System.out.println("[DataBaseManager] 删除数据库文件成功!");
} else {
System.out.println("[DataBaseManager] 删除数据库文件失败!");
}
// 删了文件之后在山目录
File dataDir = new File("./data");
// 使用 delete 删除目录的时候, 需要保证目录是空的.
ret = dataDir.delete();
if (ret) {
System.out.println("[DataBaseManager] 删除数据库目录成功!");
} else {
System.out.println("[DataBaseManager] 删除数据库目录失败!");
}
}
public void insertExchange(Exchange exchange) {
metaMapper.insertExchange(exchange);
}
public List<Exchange> selectAllExchanges() {
return metaMapper.selectAllExchanges();
}
public void deleteExchange(String exchangeName) {
metaMapper.deleteExchange(exchangeName);
}
public void insertQueue(MSGQueue queue) {
metaMapper.insertQueue(queue);
}
public List<MSGQueue> selectAllQueues() {
return metaMapper.selectAllQueues();
}
public void deleteQueue(String queueName) {
metaMapper.deleteQueue(queueName);
}
public void insertBinding(Binding binding) {
metaMapper.insertBinding(binding);
}
public List<Binding> selectAllBindings() {
return metaMapper.selectAllBindings();
}
public void deleteBinding(Binding binding) {
metaMapper.deleteBinding(binding);
}
}
5. 消息管理
MessageFileManager 类负责管理将消息存储到文件(硬盘)中的相关操作
package com.example.mq.mqserver.datacenter;
/**
* 整合管理将消息存储到文件(硬盘)中的相关操作
*/
public class MessageFileManager {
}
自定义一个和项目强相关的异常
package com.example.mq.common;
/*
* 自定义一个异常类. 如果是 mq 的业务逻辑中, 出现的异常, 就抛出这个异常对象, 同时在构造方法中指定出现异常的原因信息
*/
public class MqException extends Exception {
public MqException(String reason) {
super(reason);
}
}
5.1. 消息持久化
Message,为什么及如何在硬盘上存储?
- 消息操作并不涉及到复杂的增删改查
- 消息数量可能会非常多,数据库的访问效率并不高
🍂所以要把消息直接存储在文件中,以下设定消息具体如何在文件中存储
消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开
此时已经有了一个 data
目录(meta.db
就在这个目录中)
在 data
中创建一些子目录,每个队列对应一个子目录,子目录名就是队列名
每个队列的子目录下,再分配两个文件,来存储消息
queue_data.txt
:这个文件保存消息的内容,里面,存储的是二进制的数据,我们约定转发到这个队列的所有消息都是以二进制的方式进行存储,每个消息由以下几部分构成
首先规定前 4
个字节代表的该消息的长度,后面紧跟着的是消息本体。
对于 BrokerServer 来说,消息是需要新增和删除的。
生产者生产一个消息,就是新增一个消息
消费者消费一个消息,就是删除一个消息
对于内存中的消息新增删除就比较容易了:使用一些集合类就行
对于文件中新增:
- 采用追加方式,直接在当前文件末尾追加新增就行
对于文件中删除:
- 如果采用真正的删除,效率就会非常低。将文件视为顺序表结构,删除就会涉及到一系列的元素搬运。
- 所以我们采用逻辑删除的方式。根据消息中的一个变量 isValid 判断该消息是否有效,1 为有效消息;0 为无效消息。
🍂那么如何找到每个消息对应在文件中的位置呢?
我们之前在 Message 中设置了两个变量,一个是 offsetBeg,一个是 offsetEnd。
我们存储消息的时候,是同时在内存中存一份和硬盘中存一份。而内存中存到那一份消息,记录了当前的消息的 offsetBeg 和 offsetEnd。通过先找到内存中的消息,再根据该消息的两个变量值,就能找到硬盘中的消息数据了。
5.2. 垃圾回收
随着时间的推移,文件中存放的消息可能会越来越多。并且可能很多消息都是无用的,所以就要针对当前消息数据文件进行垃圾回收。
此处我们采用的复制算法,原理也是比较容易理解的 (复制算法:比较适用的前提是,当前的空间,有效数据不多,大多数都是无效的数据)
直接遍历原有的消息数据文件,把所有的有效数据数据重新拷贝一份到新的文件中,新文件名字和原来文件名字相同,再把旧的文件直接删除掉。
那么垃圾回收的算法有了,何时触发垃圾回收?
此处就要用到我们每个队列目录中,所对应的另一个文件 queue_stat.txt
了,使用这个文件来保存消息的统计信息
- 只存一行数据,用
\t
分割, 左边是 queue_data.txt 中消息的总数目,右边是 queue_data.txt中有效的消息数目。 形如 2000\t1500, 代表该队列总共有 2000 条消息,其中有效消息为 1500 条。 - 所以此处我们就约定,当消息总数超过 2000 条(为了避免 GC 太频繁,比如一共 4 个消息,其中 2 个消息无效了),并且有效消息数目低于总消息数的 50 %,就触发一次垃圾回收 GC
如果当一个文件消息数目非常的多,而且都是有效信息,此时会导致整个消息的数据文件非常庞大,后续针对这个文件操作就会非常耗时。假设当前文件已经达到 10 个 G 了,那么此时如果触发一次 GC,整个耗时就会非常高。
对于 RabbitMQ 来说,解决方案是把一个大的文件,拆成若干个小的文件
文件拆分:当某个文件长度达到一定的阈值的时候,就会拆分成两个文件(拆着拆着就成了很多文件)
文件合并:每个单独的文件都会进行GC,如果GC之后,发现文件变小了,就会和相邻的其他文件合并
这样做,可以保证在消息特别多的时候,也能保证性能上的及时响应
实现思路:
- 用一个专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息的数目是多少,无效消息是多少
- 设计策略:什么时候触发文件拆分,什么时候触发文件合并
5.3. 统计文件的读写
需要定义一个内部类,表用来示该队列的统计消息,此处优先考虑使用 static 静态内部类
// 定义一个内部类, 来表示该队列的统计信息
static public class Stat {
public int totalCount; // 总消息数量
public int validCount; // 有效消息数量
}
- 设定消息文件所在的目录和文件名
// 设定消息文件所在的目录和文件名
// 该方法用来获取到指定队列对应的消息文件所在路径
private String getQueueDir(String queueName) {
return "./data/" + queueName;
}
// 该方法用来获取该队列的消息数据文件路径
private String getQueueDataPath(String queueName) {
return getQueueDir(queueName) + "/queue_data.txt";
}
// 该方法用来获取该队列的消息统计文件路径
private String getQueueStatPath(String queueName) {
return getQueueDir(queueName) + "/queue_stat.txt";
}
- 统计文件的读
// 该方法用来读取队列的消息统计文件中的内容
private Stat readStat(String queueName) {
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
- 统计文件的写
// 该方法用来写入队列的消息统计文件中的内容
private void writeStat(String queueName, Stat stat) {
// 使用 PrintWrite 来写文件.
// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
5.4. 创建消息目录和文件
- 先创建队列对应的目录(以队列名字为名的目录)
- 创建队列里面的消息数据文件
- 创建队列里面的消息统计数据文件
- 给消息统计文件设置初始值
// 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {
// 1. 先创建队列对应的消息目录
File baseDir = new File(getQueueDir(queueName));
if (!baseDir.exists()) {
// 不存在, 就创建这个目录
boolean ok = baseDir.mkdirs();
if (!ok) {
throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());
}
}
// 2. 创建队列数据文件
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()) {
boolean ok = queueDataFile.createNewFile();
if (!ok) {
throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());
}
}
// 3. 创建消息统计文件
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()) {
boolean ok = queueStatFile.createNewFile();
if (!ok) {
throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());
}
}
// 4. 给消息统计文件, 设定初始值. 0\t0
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName, stat);
}
5.5. 删除消息目录和文件
- 先删除消息的统计文件和消息数据文件
- 再删除队列目录
// 删除队列的目录和文件.
// 队列是可以被删除的. 当队列删除之后, 对应的消息文件也要随之删除.
public void destroyQueueFiles(String queueName) throws IOException {
// 先删除里面的文件, 再删除目录(先为空再再删除).
File queueDataFile = new File(getQueueDataPath(queueName));
boolean ok1 = queueDataFile.delete();
File queueStatFile = new File(getQueueStatPath(queueName));
boolean ok2 = queueStatFile.delete();
File baseDir = new File(getQueueDir(queueName));
boolean ok3 = baseDir.delete();
if (!ok1 || !ok2 || !ok3) {
// 有任意一个删除失败, 都算整体删除失败.
throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath());
}
}
5.6. 消息序列化
把一个对象(结构化数据)转换成一个 字符串 / 字节数组
序列化之后方便 存储和传输
- 存储:一般存储在文件中,文件只能存字符串 / 二进制数据。不能直接存对象
- 传输:在网络中传输,socket
此处不使用 json 进行序列化, 因为 Message 里面存储是二进制数据。
而 json 序列化得到的结果是文本数据,里面无法存储二进制的 body
直接进行二进制序列化,有很多解决方案
- Java 标准库提供了序列化方案,ObjectInputStream 和 ObjectOutputStream
- Hessian 也是一个解决方案
- protobuffer
- thrift
此处使用第一种 Java 标准库自带的
序列化和反序列化代码如下:
package com.example.mq.common;
import java.io.*;
/**
* 序列化/反序列化工具
* 对象需要使用该类序列化, 要实现 Serializable 接口.
*/
public class BinaryTool {
// 把一个对象序列化成一个字节数组
public static byte[] toBytes(Object object) throws IOException {
// ByteArrayOutputStream 是一个流对象, 它可以将数据写入到一个字节数组(变长)中.
// 就是把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到 ObjectOutputStream 中.
// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
objectOutputStream.writeObject(object);
}
// 这里就把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]
return byteArrayOutputStream.toByteArray();
}
}
// 把一个字节数组, 反序列化成一个对象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
// 此处就是从 data 这个 byte[] 中读取数据并进行反序列化.
object = objectInputStream.readObject();
}
}
return object;
}
}
5.7. 把消息写入到文件
这个要将消息存入到该队列对应的文件中。
需要注意的是:此处 写入消息 需要两个参数,一个是 队列 MSGQueue,一个是消息 Message
- 先判断当前写入队列的文件在不在
- 把 Message 对象进行序列化,转换成二进制的字节数组
- 进行写入操作的操作时候要进行加锁(锁对象就是当前 MSGQueue),此处如果不加锁,当多个客户端进行发送消息的时候,可能会造成数据不对。
- 先获取当前队列消息数据文件的长度,用这个长度来计算 offsetBeg 和 offsetEnd
a. 设置该消息 offsetBeg = 当前文件长度 + 4
b. 设置该消息 offsetEnd = 当前文件长度 + 4 + 当前二进制数组长度 - 把新的 message 数据,写入到文件的末尾处,采用追加方式
a. 先写入 4 个字节的消息长度
b. 再写入消息本体 - 更新统计文件,并重新写入
// 检查队列的目录和文件是否存在.
// 有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)
public boolean checkFilesExits(String queueName) {
// 判定队列的数据文件和统计文件是否都存在
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()) {
return false;
}
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()) {
return false;
}
return true;
}
// 这个方法用来把一个新的消息, 放到队列对应的文件中.
// queue 表示要把消息写入的队列. message 则是要写的消息.
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 1. 检查一下当前要写入的队列对应的文件是否存在.
if (!checkFilesExits(queue.getName())) {
throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName());
}
// 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.
byte[] messageBinary = BinaryTool.toBytes(message);
synchronized (queue) {
// 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd
// 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
// offsetEnd 就是当前文件长度 + 4 + message 自身长度.
File queueDataFile = new File(getQueueDataPath(queue.getName()));
// 通过方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
message.setOffsetBeg(queueDataFile.length() + 4);
message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
// 4. 追加写入消息到数据文件末尾, 设置参数为 true.
try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
// 首先先写当前消息的长度, 占据 4 个字节( int 类型)的
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
// 5. 更新消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(), stat);
}
}
此处在进行写文件时需要进行加锁操作(针对当前操作的 queue 对象加锁),防止可能出现 写入文件的实际位置 和 计算出来的 offset 不符的情况。
5.8. 从文件中删除消息(逻辑删除)
- 先从硬盘中读取出来
a. 此处采用 RamdomAccessFile 来读取(可以在文中指定位置,进行读写,随机访问)
b. 先定义一个 以消息长度为 length【offsetEnd - offsetBeg】的一个字节数组 bufferSrc
c. 再根据要删除的 Message 对象中的 offsetBeg 和 offsetEnd 将光标定位那个位置
d. 然后将结果读取到 bufferSrc 中 - 然后将读到的 bufferSrc 数据反序列化成 Message 对象,修改变量 isValid=0x0
- 再将 Message对象 序列化成 bufferDes
a. 重新定位光标到消息的 offserBeg
b. 将 bufferDes 写回去 - 更新统计文件信息,写入
// 这个是删除消息的方法.
// 这里的删除是逻辑删除, 是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0
// 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;
// 2. 把 isValid 改成 0;
// 3. 把上述数据重新写回到文件.
// 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEnd
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
synchronized (queue) {
// 使用 RandomAccessFile 完成随机访问(指定位置读取)
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
// 1. 先从文件中读取对应的 Message 数据.
byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
// seek 可以调整当前的文件光标(调整位置)
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);
// 2. 把当前读出来的二进制数据, 转换回成 Message 对象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
// 3. 把 isValid 设置为无效.
diskMessage.setIsValid((byte) 0x0);
// 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象
// 而这个对象马上也要被从内存中销毁了.
// 4. 重新写入文件
byte[] bufferDest = BinaryTool.toBytes(diskMessage);
// 此时需要重新seek, 上面 seek 完了之后, 进行了读操作, 文件光标就会往后移动, 移动到了下一个消息的位置了.
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);
}
// 有效消息个数 - 1, 文件内容页需要更新
Stat stat = readStat(queue.getName());
if (stat.validCount > 0) {
stat.validCount -= 1;
}
writeStat(queue.getName(), stat);
}
}
5.9. 从硬盘中恢复数据到内存
使用这个方法将硬盘中所有的有效数据加载到内中(具体来说是一个链表中)这个方法是在程序启动的时候调用。
这里使用 LinkedList 来存储消息,方便后续进行头删操作
一个文件中会包含多个消息,需要循环去读取,此处需要手动记录光标位置
- 先读取4个字节,表示当前消息长度
- 然后根据当前消息长度,读取对应的长度到 buffer 字节数组中
- 把读取到 buffer 字节数据 反序列化成 Message 对象
- 判断这个 Message 对象里面的 isValid 是否为 0x1
- 如果不是,就 continue,是的话执行第六步,不是就从第一步开始
- 加入消息之前先设置 offsetBeg,offserEnd,然后将消息加入到 LinkedList 中
- 如果读到末尾会有异常 EOF,会自动结束
// 这个是方法从文件中, 读取出所有的消息内容, 加载到内存中(放到一个链表里)
// 这个方法在程序启动的时候, 进行调用.
// 使用 LinkedList 是为了后续方便进行头删操作.
// 该方法是在程序启动时调用, 此时服务器还不能处理请求, 不涉及多线程操作文件.
// 参数设置 queueName 即可.
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
LinkedList<Message> messages = new LinkedList<>();
try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {
try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {
// 这个变量记录当前文件光标.
long currentOffset = 0;
// 循环读取文件中的消息.
while (true) {
// 1. 读取当前消息的长度, readInt 最终会读到文件的末尾(EOF), 会抛出 EOFException 异常.
int messageSize = dataInputStream.readInt();
// 2. 按照这个长度, 读取消息内容
byte[] buffer = new byte[messageSize];
// 记录读取的实际消息长度
int actualSize = dataInputStream.read(buffer);
if (messageSize != actualSize) {
// 如果长度不匹配, 说明文件有问题, 格式错乱了
throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);
}
// 3. 把这个读到的二进制数据, 反序列化回 Message 对象
Message message = (Message) BinaryTool.fromBytes(buffer);
// 4. 判定一下看看这个消息对象, 是不是无效对象.
if (message.getIsValid() != 0x1) {
// 无效数据, 直接跳过.
// 但要更新 offset, 以方便后续有效消息 offsetBeg 和 offsetEnd 的设置和计算.
currentOffset += (4 + messageSize);
continue;
}
// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd
// 进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置
// 因此就需要手动计算下文件光标.
message.setOffsetBeg(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
// 更新当前光标位置
currentOffset += (4 + messageSize);
// 将消息添加到list
messages.add(message);
}
} catch (EOFException e) {
// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.
System.out.println("[MessageFileManager] 恢复 Message 数据完成!");
}
}
return messages;
}
5.10. 消息文件垃圾回收
由于当前会不停的往消息文件中写入消息,并且删除消息只是逻辑删除,这就可能导致消息文件越来越大,并且包含大量无用的消息。
此处使用的是复制算法。
- 判定当前文件中消息总数超过2000,并且有效消息数不足50%,就会触发垃圾回收
- 就把所有的有效消息提取出来,单独的在写到一个文件中
- 删除旧文件,使用新文件代替
- 注意:还要更新统计文件信息
// 检查当前是否要针对该队列的消息数据文件进行 GC
public boolean checkGC(String queueName) {
// 判定是否要 GC, 是根据总消息数和有效消息数. 这两个值在 消息统计文件 中.
Stat stat = readStat(queueName);
if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {
return true;
}
return false;
}
// 约定 GC 后新文件的路径 (GC完成后会将这个新文件名再修改回原来的文件名)
private String getQueueDataNewPath(String queueName) {
return getQueueDir(queueName) + "/queue_data_new.txt";
}
// 这个方法执行消息数据文件的垃圾回收操作.
// 使用复制算法来完成.
// 创建一个新的文件, 名字就是 queue_data_new.txt
// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.
// 删除旧的文件, 再把新的文件改名回 queue_data.txt
// 同时要更新消息统计文件中的信息.
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
// 进行 gc 的过程中, 其他线程不能针对该队列的消息文件做任何修改.
synchronized (queue) {
// 统计一下执行消耗的时间(GC操作可能是比较耗时的).
// GC 开始时候的时间戳
long gcBeg = System.currentTimeMillis();
// 1. 创建一个新的文件
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if (queueDataNewFile.exists()) {
// 文件不存在的情况下, 创建文件
// 正常情况下, 这个文件不应该存在. 如果存在, 就是意外(可能上次 gc 了一半, 程序意外崩溃了).
throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());
}
boolean ok = queueDataNewFile.createNewFile();
if (!ok) {
throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
}
// 2. 从旧的文件中, 读取出所有的有效消息对象.
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
// 3. 把有效消息, 写入到新的文件中.
try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
for (Message message : messages) {
byte[] buffer = BinaryTool.toBytes(message);
// 先写四个字节消息的长度
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
// 4. 删除旧的数据文件, 并且把新的文件进行重命名
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
ok = queueDataOldFile.delete();
if (!ok) {
throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
// 把 queue_data_new.txt 重命名为 queue_data.txt
ok = queueDataNewFile.renameTo(queueDataOldFile);
if (!ok) {
throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
// 5. 更新统计文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(), stat);
// GC 结束时的时间戳
long gcEnd = System.currentTimeMillis();
// 求差值
System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="
+ (gcEnd - gcBeg) + "ms");
}
}
5.11. 总结
MessageFileManager 主要负责管理消息在文件中的存储
- 设计目录结构和文件格式
- 实现了目录创建和删除
- 实现统计文件的读写
- 实现了消息的写入(按照之前的文件格式)
- 实现了消息的删除 (随机访问文件)
- 实现了所有消息的加载
- 垃圾回收(复制算法)
6. 统一硬盘存储管理
上述我们存储在硬盘中的数据,分为了两个,一个是存放数据库中,一个是存放在文件中。
我们需要统一封装一个类对上面硬盘数据进行管理
package com.example.mq.mqserver.datacenter;
import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/*
* 使用这个类来管理所有硬盘上的数据.
* 1. 数据库: 交换机, 绑定, 队列
* 2. 数据文件: 消息
* 上层逻辑如果需要操作硬盘, 统一都通过这个类来使用.
*/
public class DiskDataCenter {
// 这个实例用来管理数据库中的数据
private DataBaseManager dataBaseManager = new DataBaseManager();
// 这个实例用来管理数据文件中的数据
private MessageFileManager messageFileManager = new MessageFileManager();
public void init() {
// 针对上述两个实例进行初始化.
dataBaseManager.init();
// 当前 messageFileManager.init 是空的方法, 后续需要扩展, 就在这里初始化.
messageFileManager.init();
}
// 封装交换机操作
public void insertExchange(Exchange exchange) {
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName) {
dataBaseManager.deleteExchange(exchangeName);
}
public List<Exchange> selectAllExchanges() {
return dataBaseManager.selectAllExchanges();
}
// 封装队列操作
public void insertQueue(MSGQueue queue) throws IOException {
dataBaseManager.insertQueue(queue);
// 创建队列的同时, 不仅要把队列对象写到数据库中, 还需要创建出对应的目录和文件
messageFileManager.createQueueFiles(queue.getName());
}
public void deleteQueue(String queueName) throws IOException {
dataBaseManager.deleteQueue(queueName);
// 删除队列的同时, 不仅要把队列从数据库中删除, 还需要删除对应的目录和文件
messageFileManager.destroyQueueFiles(queueName);
}
public List<MSGQueue> selectAllQueues() {
return dataBaseManager.selectAllQueues();
}
// 封装绑定操作
public void insertBinding(Binding binding) {
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding) {
dataBaseManager.deleteBinding(binding);
}
public List<Binding> selectAllBindings() {
return dataBaseManager.selectAllBindings();
}
// 封装消息操作
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue, message);
}
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessage(queue, message);
// 删除后要考虑是否要 GC
if (messageFileManager.checkGC(queue.getName())) {
messageFileManager.gc(queue);
}
}
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
}
7. 内存数据管理
package com.example.mq.mqserver.datacenter;
/*
* 使用这个类来统一管理内存中的所有数据.
* 该类提供的一些方法, 可能会在多线程环境下被使用. 要注意线程安全问题.
*/
public class MemoryDataCenter {
// ...
}
7.1. 设计数据结构
使用内存管理上面涉及到的数据,对于 MQ 来说,内存存储数据为主;硬盘存储数据为辅(主要是为了持久化,重启之后,数据不丢失)
交换机:Key:交换机名字;Value:交换机
// 存储交换机, key 是 exchangeName, value 是 Exchange 对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
队列: Key:队列名称; Value:队列
// 存储队列, key 是 queueName, value 是 MSGQueue 对象
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
绑定:Key:交换机名字 ;Value.key:队列名字;Value.value:绑定关系
// 存储绑定关系, 第一个 key 是 exchangeName, 第二个 key 是 queueName
private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
消息:Key:MessageId;Value:Message对象
// 存储消息, key 是 messageId, value 是 Message 对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
表示队列和消息之间的关联:Key:队列名字;Value: 是一个存储消息的链表
// 存储队列和消息之间的从属关系, key 是 queueName, value 是一个 Message 的链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
表示未被确认的消息:存储了哪些消息被消费者取走,但是还没应答
key:队列名称;Value.key:MessageId;Value.value:Message 对象
// 存储队列中未被确认过的消息, 第一个 key 是 queueName, 第二个 key 是 messageId
private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
此处咱们实现的 MQ,支持两种应答模式(ACK)
- 自动应答:消费者取走元素,这个消息就是被应答了,就需要删除
- 手动应答:消费者取走元素,还不算应答,需要消费者再主动调用一个 basicAck 方法,此时才算真正应答了,才可以删除消息
7.2. 实现交换机的管理
- 添加交换机
public void insertExchange(Exchange exchange) {
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName=" + exchange.getName());
}
- 获取交换机
public Exchange getExchange(String exchangeName) {
return exchangeMap.get(exchangeName);
}
- 删除交换机
public void deleteExchange(String exchangeName) {
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName);
}
7.3. 实现队列的管理
- 添加队列
public void insertQueue(MSGQueue queue) {
queueMap.put(queue.getName(), queue);
System.out.println("[MemoryDataCenter] 新队列添加成功! queueName=" + queue.getName());
}
- 获取队列
public MSGQueue getQueue(String queueName) {
return queueMap.get(queueName);
}
- 删除队列
public void deleteQueue(String queueName) {
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 队列删除成功! queueName=" + queueName);
}
7.4. 实现绑定的管理
- 添加绑定
public void insertBinding(Binding binding) throws MqException {
// 先使用 exchangeName 查一下, 对应的哈希表是否存在. 不存在就创建一个.
// ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
// if (bindingMap == null) {
// bindingMap = new ConcurrentHashMap<>();
// bindingsMap.put(binding.getExchangeName(), bindingMap);
// }
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
synchronized (bindingMap) {
// 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入.
if (bindingMap.get(binding.getQueueName()) != null) {
throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() +
", queueName=" + binding.getQueueName());
}
bindingMap.put(binding.getQueueName(), binding);
}
System.out.println("[MemoryDataCenter] 新绑定添加成功! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
// 获取绑定, 写两个版本:
// 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding
// 2. 根据 exchangeName 获取到所有的 Binding
public Binding getBinding(String exchangeName, String queueName) {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
if (bindingMap == null) {
return null;
}
return bindingMap.get(queueName);
}
- 获取绑定
public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
return bindingsMap.get(exchangeName);
}
- 删除绑定
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if (bindingMap == null) {
// 该交换机没有绑定任何队列. 报错.
throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
7.5. 实现消息的管理
- 添加消息
// 添加消息
public void addMessage(Message message) {
messageMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 新消息添加成功! messageId=" + message.getMessageId());
}
- 根据 id 获取消息
// 根据 id 查询消息
public Message getMessage(String messageId) {
return messageMap.get(messageId);
}
- 根据 id 删除消息
// 根据 id 删除消息
public void removeMessage(String messageId) {
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息被移除! messageId=" + messageId);
}
- 发送消息到指定队列
// 发送消息到指定队列
public void sendMessage(MSGQueue queue, Message message) {
// 把消息放到对应的队列数据结构中.
// 先根据队列的名字, 找到该队列对应的消息链表.
// LinkedList<Message> messages = queueMessageMap.get(queue.getName());
// if (messages == null) {
// // 如果不存在, 就创建.
// messages = new LinkedList<>();
// queueMessageMap.put(queue.getName(), messages);
// }
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
// 再把数据加到 messages 里面
synchronized (messages) {
messages.add(message);
}
// 在这里要把该消息往消息中心中也插入一下. 假设如果 message 已经在消息中心存在, 重复插入也没关系.
// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器代码不会对 Message 内容做修改->basicProperties 和 body)
addMessage(message);
System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId=" + message.getMessageId());
}
此处发送消息到指定队列需要进行加锁操作,防止重复在该队列中插入消息
- 从队列中获取指定消息
// 从队列中取消息
public Message pollMessage(String queueName) {
// 根据队列名, 查找对应的队列的消息链表.
LinkedList<Message> messages = queueMessageMap.get(queueName);
if (messages == null) {
return null;
}
synchronized (messages) {
// 如果没找到, 说明队列中没有任何消息.
if (messages.size() == 0) {
return null;
}
// 链表中有元素, 就进行头删.
Message currentMessage = messages.remove(0);
System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId=" + currentMessage.getMessageId());
return currentMessage;
}
}
- 获取指定队列中的消息个数
// 获取指定队列中消息的个数
public int getMessageCount(String queueName) {
LinkedList<Message> messages = queueMessageMap.get(queueName);
if (messages == null) {
// 队列中没有消息
return 0;
}
synchronized (messages) {
return messages.size();
}
}
7.6. 实现待确认消息的管理
- 添加未确认的消息
// 添加未确认的消息
public void addMessageWaitAck(String queueName, Message message) {
ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
k -> new ConcurrentHashMap<>());
messageHashMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId=" + message.getMessageId());
}
- 删除待确认的消息(已经确认过的消息)
// 删除未确认的消息(消息已经确认了)
public void removeMessageWaitAck(String queueName, String messageId) {
ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if (messageHashMap == null) {
return;
}
messageHashMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId=" + messageId);
}
- 获取到指定的待确认消息
// 获取指定的未确认的消息
public Message getMessageWaitAck(String queueName, String messageId) {
ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if (messageHashMap == null) {
return null;
}
return messageHashMap.get(messageId);
}
7.7. 实现数据从硬盘中恢复
从硬盘中读取数据,把硬盘之前持久化存储的各个维度的数据恢复到内存中
- 清空之前集合中的数据
- 恢复所有的交换机数据
- 恢复所有的队列数据
- 恢复所有的绑定数据
- 恢复所有消息数据
注意:不需要恢复待确认的消息,因为在 当消息在等待 ACK 的时候,服务器重启了。此时消息就相当于未被取走状态,而硬盘中存储的就是消息就是“未被取走”的。
// 这个方法用来从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 0. 清空之前的所有数据
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();
// 1. 恢复所有的交换机数据
List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
for (Exchange exchange : exchanges) {
exchangeMap.put(exchange.getName(), exchange);
}
// 2. 恢复所有的队列数据
List<MSGQueue> queues = diskDataCenter.selectAllQueues();
for (MSGQueue queue : queues) {
queueMap.put(queue.getName(), queue);
}
// 3. 恢复所有的绑定数据
List<Binding> bindings = diskDataCenter.selectAllBindings();
for (Binding binding : bindings) {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(), binding);
}
// 4. 恢复所有的消息数据
// 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息.
for (MSGQueue queue : queues) {
LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
queueMessageMap.put(queue.getName(), messages);
for (Message message : messages) {
messageMap.put(message.getMessageId(), message);
}
}
// 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复. 硬盘存储中也没设定这一步.
// 一旦在等待 ack 的过程中, 服务器重启了, 此时这些 "未被确认的消息", 就恢复成 "未被取走的消息" .
// 这个消息在硬盘上存储的时候, 默认就是当做 "未被取走" 的.
}
7.8. 总结
借助内存中的一些列数据结构 ,保存和管理 交换机、队列、绑定、消息
广泛使用了 哈希表、链表、嵌套的数据结构等
考虑线程安全:
要不要加锁?锁加到哪里?使用哪个对象作为锁对象?
8. 虚拟主机的设计
类似于 MySQL 的 database,把交换机,队列,绑定,消息…进行逻辑上的隔离,一个服务器可以有多个虚拟主机,这里的项目只是设计了一个虚拟主机(VirtualHost)。
虚拟主机,不仅仅要管理数据,还要提供一些核心 API,供上层代码调用。
/*
* 通过这个类, 来表示 虚拟主机.
* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.
* 同时提供 api 供上层调用.
* VirtualHost 这个类是作为业务逻辑的整合者, 这里对代码中抛出的异常进行处.
*/
public class VirtualHost {
private String virtualHostName;
private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
private Router router = new Router();
private DiskDataCenter diskDataCenter = new DiskDataCenter();
private ConsumerManager consumerManager = new ConsumerManager(this);
// 操作交换机的锁对象
private final Object exchangeLocker = new Object();
// 操作队列的锁对象
private final Object queueLocker = new Object();
public String getVirtualHostName() {
return virtualHostName;
}
public MemoryDataCenter getMemoryDataCenter() {
return memoryDataCenter;
}
public DiskDataCenter getDiskDataCenter() {
return diskDataCenter;
}
public VirtualHost(String name) {
this.virtualHostName = name;
// 针对 MemoryDataCenter 不需要额外的初始化操作, 只要对象 new 出来就行了
// 针对 DiskDataCenter, 需要进行初始化操作. (建库建表和初始数据的设定).
diskDataCenter.init();
// 还需要针对硬盘的数据, 进行恢复到内存中.
try {
memoryDataCenter.recovery(diskDataCenter);
} catch (IOException | MqException | ClassNotFoundException e) {
e.printStackTrace();
System.out.println("[VirtualHost] 恢复内存数据失败!");
}
}
// ...
}
8.1. 创建交换机(exchangeDelcare)
如何表示,交换机和虚拟主机之间的从属关系呢?
- 方案一:参考数据库设计,“一对多”方案,比如给交换机表,添加个属性,虚拟主机 id/name
- 方案二:交换机的名字 = 虚拟主机名字 + 交换机的真实名字
- 方案三:更优雅的办法,是给每个虚拟主机,分配一组不同的数据库和文件
这里采用的是方案二,按照方案二,也可以去区分不同的队列,进一步由于,绑定和队列和交换机都相关,直接就隔离开了,再进一步,消息和队列是强相关的,队列名区分开,消息自然区分开。
此时就可以区分不同虚拟主机的不同交换机的关系
- 把交换机名字加上虚拟主机名字作为前缀
- 判断交换机是否存在,直接通过内存查询
- 真正去构造交换机对象
- 当参数 durable 为 true 的时候,将交换机对象写入硬盘
- 将交换机写入内存
// 创建交换机
// 如果交换机不存在, 就创建. 如果存在, 直接返回.
// 返回值是 boolean. 创建成功, 返回 true. 失败返回 false
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
Map<String, Object> arguments) {
// 把交换机的名字, 加上虚拟主机作为前缀.
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
// 1. 判定该交换机是否已经存在. 直接通过内存查询.
Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
if (existsExchange != null) {
// 该交换机已经存在
System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName);
return true;
}
// 2. 真正创建交换机. 先构造 Exchange 对象
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arguments);
// 3. 把交换机对象写入硬盘
if (durable) {
diskDataCenter.insertExchange(exchange);
}
// 4. 把交换机对象写入内存
memoryDataCenter.insertExchange(exchange);
System.out.println("[VirtualHost] 交换机创建完成! exchangeName=" + exchangeName);
// 上面的逻辑是先写硬盘, 后写内存. 硬盘写更容易失败. 如果硬盘写失败了, 内存就不写了.
// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 处理比较麻烦.
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName);
e.printStackTrace();
return false;
}
}
8.2. 删除交换机(exchangeDelete)
- 根据交换机的名字找到对应的交换机
- 删除硬盘数据
- 删除内存中数据
// 删除交换机
public boolean exchangeDelete(String exchangeName) {
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
// 1. 先找到对应的交换机.
Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
if (toDelete == null) {
throw new MqException("[VirtualHost] 交换机不存在无法删除!");
}
// 2. 删除硬盘上的数据
if (toDelete.isDurable()) {
diskDataCenter.deleteExchange(exchangeName);
}
// 3. 删除内存中的交换机数据
memoryDataCenter.deleteExchange(exchangeName);
System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);
e.printStackTrace();
return false;
}
}
8.3. 创建队列(queueDelcare)
- 判断队列是否存在
- 不存在则创建队列,设定参数
- 队列参数 durable 为 true 的时候存入硬盘
- 将队列写入到内存
// 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) {
// 把队列的名字, 给拼接上虚拟主机的名字.
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 判定队列是否存在
MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
if (existsQueue != null) {
System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);
return true;
}
// 2. 创建队列对象
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(durable);
queue.setExclusive(exclusive);
queue.setAutoDelete(autoDelete);
queue.setArguments(arguments);
// 3. 写硬盘
if (durable) {
diskDataCenter.insertQueue(queue);
}
// 4. 写内存
memoryDataCenter.insertQueue(queue);
System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
8.4. 删除队列
- 判断队列是否存在
- 存在则删除,先在硬盘删除
- 在内存中删除
// 删除队列
public boolean queueDelete(String queueName) {
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 根据队列名字, 查询下当前的队列对象
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! 无法删除! queueName=" + queueName);
}
// 2. 删除硬盘数据
if (queue.isDurable()) {
diskDataCenter.deleteQueue(queueName);
}
// 3. 删除内存数据
memoryDataCenter.deleteQueue(queueName);
System.out.println("[VirtualHost] 删除队列成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 删除队列失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
8.5. 创建绑定(queueBind)
- 判断当前绑定是否存在
- 验证当前的 bindingKey 是否合法
- 合法,就创建绑定,设置参数
- 从内存中获取下绑定关系的队列和交换机是否存在
- 都存在,再次判定队列和交换机的 durable 是否都为 true
- 都为 true 则存入硬盘
- 再写入内存
// 创建绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
synchronized (queueLocker) {
// 1. 判定当前的绑定是否已经存在了.
Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);
if (existsBinding != null) {
throw new MqException("[VirtualHost] binding 已经存在! queueName=" + queueName
+ ", exchangeName=" + exchangeName);
}
// 2. 验证 bindingKey 是否合法.
if (!router.checkBindingKey(bindingKey)) {
throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);
}
// 3. 创建 Binding 对象
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
// 4. 获取检查对应的交换机和队列. 如果交换机或者队列不存在, 绑定是无法创建.
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
}
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
}
// 5. 先写硬盘
if (queue.isDurable() && exchange.isDurable()) {
diskDataCenter.insertBinding(binding);
}
// 6. 写入内存
memoryDataCenter.insertBinding(binding);
}
}
System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
e.printStackTrace();
return false;
}
}
8.6. 删除绑定(queueUnbind)
有个依赖关系问题,就是 如果 线程A 先删除了队列,而此时另一个 线程B 再去删除绑定消息时候,就会失败,因为此时队列已经不存在了,此时需要考虑这个问题如何解决
- 方案一:参考类似于 MySQL 的外键一样,删除 交换机 / 队列 的时候,判定一下当前 队列 / 交换机 是否存在对应的绑定,如果存在,则禁止删除,要求先解除绑定,再尝试删除
- 方案二:直接删除,不判断 交换机和队列是否存在(当前采用)
- 获取绑定是否存在
- 删除硬盘上的数据,需要判断该绑定 durable 是否为 true(直接尝试删也没有问题)
- 从内存中删除绑定
// 解除绑定
public boolean queueUnbind(String queueName, String exchangeName) {
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
synchronized (queueLocker) {
// 1. 获取 binding 看是否已经存在
Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
if (binding == null) {
throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);
}
// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 不存在, 这个删除也无副作用.
diskDataCenter.deleteBinding(binding);
// 3. 删除内存的数据
memoryDataCenter.deleteBinding(binding);
System.out.println("[VirtualHost] 删除绑定成功!");
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 删除绑定失败!");
e.printStackTrace();
return false;
}
}
注意:考虑线程安全问题
8.7. 发送消息
在往队列发送消息的时候,会往 ConsumerManager 类中的 阻塞队列 BlockingQueue<String> tokenQueue
加入该队列名,tokenQueue 中存在该队列名,就表示该队列存在消息。
// 发送消息到指定的交换机/队列中.
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
try {
// 1. 转换交换机的名字
exchangeName = virtualHostName + exchangeName;
// 2. 检查 routingKey 是否合法.
if (!router.checkRoutingKey(routingKey)) {
throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
}
// 3. 查找交换机对象
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
}
// 4. 判定交换机的类型
if (exchange.getType() == ExchangeType.DIRECT) {
// 按照直接交换机的方式来转发消息
// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
// 此时, 可以无视绑定(忽略交换机)关系, 直接向指定队列发送消息.
String queueName = virtualHostName + routingKey;
// 5. 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 6. 查找该队列名对应的对象
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
}
// 7. 队列存在, 直接给队列中写入消息
sendMessage(queue, message);
} else {
// 按照 fanout 和 topic 的方式来转发.
// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象
ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
// 1) 获取到绑定对象, 判定对应的队列是否存在
Binding binding = entry.getValue();
MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if (queue == null) {
// 可能此处有多个这样的队列. 这里不抛异常.
System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
continue;
}
// 2) 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 3) 判定这个消息是否能转发给该队列.
// 如果是 fanout, 所有绑定的队列都要转发的.
// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
if (!router.route(exchange.getType(), binding, message)) {
continue;
}
// 4) 真正转发消息给队列
sendMessage(queue, message);
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 消息发送失败!");
e.printStackTrace();
return false;
}
}
private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
// 发送消息要把消息写入到 硬盘 和 内存 上.
// 要判断是否应该往硬盘写
int deliverMode = message.getDeliverMode();
// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.
if (deliverMode == 2) {
diskDataCenter.sendMessage(queue, message);
}
// 写入内存
memoryDataCenter.sendMessage(queue, message);
// 通知消费者可以消费消息了.
consumerManager.notifyConsume(queue.getName());
}
判定消息是否可以转发给这个绑定对应的队列
package com.example.mq.mqserver.core;
import com.example.mq.common.MqException;
/*
* 使用这个类, 来实现交换机的转发规则.
* 同时也借助这个类验证 bindingKey 是否合法.
*/
public class Router {
// ...
// 这个方法用来判定该消息是否可以转发给这个绑定对应的队列.
public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {
// 根据不同的 exchangeType 使用不同的判定转发规则.
if (exchangeType == ExchangeType.FANOUT) {
// 如果是 FANOUT 类型, 则该交换机上 绑定的所有队列都需要转发
return true;
} else if (exchangeType == ExchangeType.TOPIC) {
// 如果是 TOPIC 主题交换机, 规则就要更复杂一些.
return routeTopic(binding, message);
} else {
// 其他情况是不应该存在的.
throw new MqException("[Router] 交换机类型非法! exchangeType=" + exchangeType);
}
}
// ...
}
8.8. Topic 交换机转发规则
bindingKey(创建绑定的时候,给绑定指定的特殊字符串)
- 数字、字母、下划线
- 使用 . 把整个 routingKey 分成若干个部分 形如:
aaa.bbb.ccc
- 支持两种特殊符号,作为通配符(只能作为被 . 分割单独的存在)
- 一个是
*
形如:aaa.*.bbb
,* 可以匹配任何一个独立的部分 - 一个
#
形如:aaa.#.bbb
,# 可以匹配 0 个或者多个独立的部分
routingKey (发布消息的时候,给消息上指定的特殊字符串)
- 数字、字母、下划线
- 使用
.
把整个 routingKey 分成若干个部分 形如:aaa.bbb.ccc
上述规则,是根据 AMQP 协议规定的
以下这部分代码写在 Router
类中的
🍂验证 bindingKey 是否合法(checkBindingKey)
// bindingKey 的构造规则:
// 1. 数字, 字母, 下划线
// 2. 使用 . 分割成若干部分
// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.
public boolean checkBindingKey(String bindingKey) {
if (bindingKey.length() == 0) {
// 空字符串, 也是合法情况. 在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.
return true;
}
// 检查字符串中不能存在非法字符
for (int i = 0; i < bindingKey.length(); i++) {
char ch = bindingKey.charAt(i);
if (ch >= 'A' && ch <= 'Z') {
continue;
}
if (ch >= 'a' && ch <= 'z') {
continue;
}
if (ch >= '0' && ch <= '9') {
continue;
}
if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
continue;
}
return false;
}
// 检查 * 或者 # 是否是独立的部分.
// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.
String[] words = bindingKey.split("\\.");
for (String word : words) {
// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
return false;
}
}
// 约定通配符之间的相邻关系(自定义的, 非 RABBITMQ 的).
// 1. aaa.#.#.bbb => 非法
// 2. aaa.#.*.bbb => 非法
// 3. aaa.*.#.bbb => 非法
// 4. aaa.*.*.bbb => 合法
// 5. aaa.#.bbb => 合法
// 前三种实现匹配比较复杂
for (int i = 0; i < words.length - 1; i++) {
// 连续两个 ##
if (words[i].equals("#") && words[i + 1].equals("#")) {
return false;
}
// # 连着 *
if (words[i].equals("#") && words[i + 1].equals("*")) {
return false;
}
// * 连着 #
if (words[i].equals("*") && words[i + 1].equals("#")) {
return false;
}
}
return true;
}
🍂验证 routingKey 是否合法(checkRoutingKey)
// routingKey 的构造规则:
// 1. 数字, 字母, 下划线
// 2. 使用 . 分割成若干部分
public boolean checkRoutingKey(String routingKey) {
if (routingKey.length() == 0) {
// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""
return true;
}
for (int i = 0; i < routingKey.length(); i++) {
char ch = routingKey.charAt(i);
// 判定该字符是否是大写字母
if (ch >= 'A' && ch <= 'Z') {
continue;
}
// 判定该字母是否是小写字母
if (ch >= 'a' && ch <= 'z') {
continue;
}
// 判定该字母是否是阿拉伯数字
if (ch >= '0' && ch <= '9') {
continue;
}
// 判定是否是 _ 或者 .
if (ch == '_' || ch == '.') {
continue;
}
// 该字符, 不是上述任何一种合法情况, 返回 false
return false;
}
// 遍历字符串后, 没有遇到非法情况. 返回 true
return true;
}
🍂匹配规则
private boolean routeTopic(Binding binding, Message message) {
// 先把这两个 key 进行切分
String[] bindingTokens = binding.getBindingKey().split("\\.");
String[] routingTokens = message.getRoutingKey().split("\\.");
// 引入两个下标, 指向上述两个数组. 初始情况下都为 0
int bindingIndex = 0;
int routingIndex = 0;
while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {
if (bindingTokens[bindingIndex].equals("*")) {
// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分
bindingIndex++;
routingIndex++;
continue;
} else if (bindingTokens[bindingIndex].equals("#")) {
// 如果遇到 #, 需要先看看有没有下一个位置.
bindingIndex++;
if (bindingIndex == bindingTokens.length) {
// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了
return true;
}
// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.
// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1
routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);
if (routingIndex == -1) {
// 没找到匹配的结果. 匹配失败
return false;
}
// 找到的匹配的情况, 继续往后匹配.
bindingIndex++;
routingIndex++;
} else {
// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.
if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {
return false;
}
bindingIndex++;
routingIndex++;
}
}
// [情况五] 判定是否是双方同时到达末尾
// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的.
if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
return true;
}
return false;
}
private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {
for (int i = routingIndex; i < routingTokens.length; i++) {
if (routingTokens[i].equals(bindingToken)) {
return i;
}
}
return -1;
}
8.9. 订阅消息(basicComsume)
一个虚拟主机中,有很多队列,每个队列上都有很多条消息。
那么需要针对是哪个消费者订阅了哪条队列的消息需要进行一个管理。
将消息推送给消费者消息的基本实现思路
- 让 brokerserver 把有哪些消费者管理好
- 收到对应的消息,就把消息推送给消费者
消费者是以队列为维度来订阅消息的,一个队列可以有多个消费者(此处我们约定消费者之间按照轮询的方式来进行消费)。
🍂先定义一个类,描述一个消费者(也会包含一些消费者消费过程中用到的数据)
再给每个队列对象(MSGQueue 对象)加上属性一个 List 这样的属性,它里面包含若干个上述的消费者对象
package com.example.mq.common;
/*
* 表示一个消费者(完整的执行环境)
*/
public class ConsumerEnv {
private String consumerTag; // 消费者的身份标识
private String queueName; // 订阅的队列名称
private boolean autoAck; // 应答模式(为 true 自动应答. 为 false 手动应答.)
private Consumer consumer; // 回调: 处理收到的消息.
public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
this.consumerTag = consumerTag;
this.queueName = queueName;
this.autoAck = autoAck;
this.consumer = consumer;
}
public String getConsumerTag() {
return consumerTag;
}
public void setConsumerTag(String consumerTag) {
this.consumerTag = consumerTag;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public boolean isAutoAck() {
return autoAck;
}
public void setAutoAck(boolean autoAck) {
this.autoAck = autoAck;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
}
而回调需要去定义一个函数式接口,什么是函数式接口
由于 Java 的函数不能脱离类的存在, 为了实现 lambda,Java 引入了函数式接口
lambda 的本质(底层实现)
- interface
- 只能有一个方法
- 还需要加
@FunctionalInterface
注解
package com.example.mq.common;
import com.example.mq.mqserver.core.BasicProperties;
import java.io.IOException;
/*
* 函数式接口(回调函数). 收到消息之后要处理消息时调用的方法.
*/
@FunctionalInterface
public interface Consumer {
// Delivery 的意思是 "投递", 这个方法在每次服务器收到消息之后调用.
// 通过这个方法把消息推送给对应的消费者.
void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}
MSGQueue 对象管理消费者的数据结构
🍂实现一个类(完成消费者消费消息核心逻辑)
package com.example.mq.mqserver.core;
import com.example.mq.common.Consumer;
import com.example.mq.common.ConsumerEnv;
import com.example.mq.common.MqException;
import com.example.mq.mqserver.VirtualHost;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/*
* 通过这个类, 实现消费消息的核心逻辑.
*/
public class ConsumerManager {
// 持有上层的 VirtualHost 对象的引用. 用来操作数据.
private VirtualHost parent;
// 指定一个线程池, 负责去执行具体的回调任务.
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
// 存放令牌的队列
private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
// 扫描线程
private Thread scannerThread = null;
public ConsumerManager(VirtualHost p) {
parent = p;
scannerThread = new Thread(() -> {
while (true) {
try {
// 1. 拿到令牌
String queueName = tokenQueue.take();
// 2. 根据令牌, 找到队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);
}
// 3. 从这个队列中消费一个消息.
synchronized (queue) {
consumeMessage(queue);
}
} catch (InterruptedException | MqException e) {
e.printStackTrace();
}
}
});
// 把线程设为后台线程.
scannerThread.setDaemon(true);
scannerThread.start();
}
// 通知消费: 调用时机是发送消息的时候.(将队列名加入到令牌队列)
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
// 找到对应的队列.
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
synchronized (queue) {
queue.addConsumerEnv(consumerEnv);
// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
int n = parent.getMemoryDataCenter().getMessageCount(queueName);
for (int i = 0; i < n; i++) {
// 这个方法调用一次就消费一条消息.
consumeMessage(queue);
}
}
}
private void consumeMessage(MSGQueue queue) {
// 1. 按照轮询的方式, 找个消费者出来.
ConsumerEnv luckyDog = queue.chooseConsumer();
if (luckyDog == null) {
// 当前队列没有消费者, 暂时不消费.
return;
}
// 2. 从队列中取出一个消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
if (message == null) {
// 当前队列中还没有消息, 也不需要消费.
return;
}
// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
workerPool.submit(() -> {
try {
// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
// 2. 真正执行回调操作
luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());
// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.
// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.
if (luckyDog.isAutoAck()) {
// 1) 删除硬盘上的消息
if (message.getDeliverMode() == 2) {
parent.getDiskDataCenter().deleteMessage(queue, message);
}
// 2) 删除上面的待确认集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
// 3) 删除内存中消息中心里的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());
System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
🍂订阅消息的核心逻辑,就是调用 consumerManager.addConsumer方法,并传入参数(consumerTag、queueName、autoAck、consumer【回调函数】)。
这个方法的底层调用逻辑是(consumerManager.addConsumer)
- 根据传入的 queueName 查到该队列
- 然后创一个消费者表示 ConsumerEnv,存入到该队列的 ConsumerEnvList中
- 判断该队列中是否存在消息,已经存在的话,就 consumeMessage 消费完全部消息(按照轮询方式)
// 订阅消息.
// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.
// consumerTag: 消费者的身份标识
// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.
// consumer: 是一个回调函数. 类型设定成函数式接口. 后续调用 basicConsume 并且传实参的时候, 可以写作 lambda.
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.
queueName = virtualHostName + queueName;
try {
consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
🍂关于消息确认
能够确保消息是被正确的消费掉了,消费者的回调函数,顺利执行完了(中间没有抛出异常)这条消息就可以被删除了。
消息确认也就是为了保证“消息不丢失”
为了达成消息不丢失这样的效果,这样处理:
- 在真正执行回调之前,把这个消息先放到 “待确认的集合”中
- 真正执行回调
- 当前消费者采取的是 autoAck=true,自动应答,消息推送给消费者,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息
- 硬盘
- 内存消息中心哈希表
- 待确认消息集合
- 当前消费者采取的是 autoAck=false,手动应答,就需要消费者再回调函数内部,显式调用 basicAck这个核心 API
basicAck实现原理,比较简单,当传入参数 autoAck=false,消费者就需要在回调函数中手动调用 basicAck
- 传入 queueName 和 messageId
- 获取到队列和消息
- 删除硬盘中数据
- 删除内存中心的消息数据
- 删除待确认集合中的消息数据
public boolean basicAck(String queueName, String messageId) {
queueName = virtualHostName + queueName;
try {
// 1. 获取到消息和队列
Message message = memoryDataCenter.getMessage(messageId);
if (message == null) {
throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);
}
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);
}
// 2. 删除硬盘上的数据
if (message.getDeliverMode() == 2) {
diskDataCenter.deleteMessage(queue, message);
}
// 3. 删除消息中心中的数据
memoryDataCenter.removeMessage(messageId);
// 4. 删除待确认的集合中的数据
memoryDataCenter.removeMessageWaitAck(queueName, messageId);
System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName
+ ", messageId=" + messageId);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName
+ ", messageId=" + messageId);
e.printStackTrace();
return false;
}
}
消息确认是为了保证消息的不丢失,它的逻辑是这样的:
- 如果执行回调方法的过程中,抛异常了
- 当回调函数异常,后续逻辑执行不到了。此时这个消息就会始终待在待确认集合中。
- 而在 RabbitMQ 中会设置一个死信队列,每一个队列都会绑定一个死信队列。应用场景:首先消息在消费时被投入的待确认集合,当消息在消费过程中出现异常,最后就会把待确认集合中的消息投入到死信队列中(有一个扫描线程负责关注待确认集合,待确认集合中每个待确认的消息如果待的时间超出了范围就会把这个消息放到死信队列);当消息设置了过期时间,如果在过期时间内,还没有完成消费,就会投入到死信队列中;当队列达到最大长度时,新的消息将无法被发送到队列中,此时,RabbitMQ 可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。
- 执行回调过程中,Broker Server 崩溃了
- 此时内存数据就都没了!但是硬盘数据是还在的,正在消费的这个消息,在硬盘中仍然存在。BrokerServer重启后,这个消息就又被加载到内存了,就像从来没被消费过一样。消费者就会有机会重新得到这个消息(重复消费的问题,由消费者的业务代码负责保证)。
9. 网络通信设计
9.1. 基于TCP,自定义应用层协议
type
:描述当前这个请求和响应,是干啥的。用四个字节来存储
- 在 MQ 中,客户端(生产者 + 消费者)和 服务器 (Broker Server)之间,要进行哪些操作?(就是 VirtualHost 中的那些核心API)
- 希望客户端,能通过网络远程调用这些 API
- 此处的 type 就是描述当前这个 请求 / 响应 是在调用哪个 API
- TCP 是有连接的,Channel 是 Connection 内部的逻辑连接,此时一个 Connection 中可能有多个连接
- Channel 存在的意义是让 TCP 连接得到复用(创建/断开TCP连接成本挺高【需要三次握手,四次挥手】)
-
length
:里面存储的是 payload 的长度,用 4 个字节来存储 -
payload
:会根据当前是请求还是响应,以及当前的 type 有不同的值
- 比如 type 是 0x3(创建交换机),同时当前是个请求,此时 payload 的内容,就相当于是 exchangeDelcare 的参数的序列化结果
- 比如 type 是 0x3(创建交换机),同时当前是个响应,此时 payload 的内容,就相当于是 exchangeDelcare 的返回结果的序列化内容
🍂请求对象
package com.example.mq.common;
/*
* 表示一个网络通信中的请求对象. 按照自定义协议的格式来展开
*/
public class Request {
private int type;
private int length;
private byte[] payload;
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
}
🍂响应对象
package com.example.mq.common;
/*
* 这个对象表示一个响应. 也是根据自定义应用层协议来的
*/
public class Response {
private int type;
private int length;
private byte[] payload;
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
}
9.2. ExchangeDelcare
请求 Request
响应 Response
package com.example.mq.common;
import com.example.mq.mqserver.core.ExchangeType;
import java.io.Serializable;
import java.util.Map;
/**
* 创建交换机的参数类
*/
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
private String exchangeName;
private ExchangeType exchangeType;
private boolean durable;
private boolean autoDelete;
private Map<String, Object> arguments;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public ExchangeType getExchangeType() {
return exchangeType;
}
public void setExchangeType(ExchangeType exchangeType) {
this.exchangeType = exchangeType;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
public Map<String, Object> getArguments() {
return arguments;
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
远程调用方法的公共参数
package com.example.mq.common;
import java.io.Serializable;
/*
* 使用这个类表示方法的公共参数/辅助的字段.
* 后续每个方法又会有一些不同的参数, 不同的参数再分别使用不同的子类来表示.
*/
public class BasicArguments implements Serializable {
// 表示一次请求/响应 的身份标识. 可以把请求和响应对上.
protected String rid;
// 这次通信使用的 channel 的身份标识.
protected String channelId;
public String getRid() {
return rid;
}
public void setRid(String rid) {
this.rid = rid;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
}
远程调用的方法的返回值的公共信息
package com.example.mq.common;
import java.io.Serializable;
/*
* 这个类表示各个远程调用的方法的返回值的公共信息.
*/
public class BasicReturns implements Serializable {
// 用来标识唯一的请求和响应.
protected String rid;
// 用来标识一个 channel
protected String channelId;
// 表示当前这个远程调用方法的返回值
protected boolean ok;
public String getRid() {
return rid;
}
public void setRid(String rid) {
this.rid = rid;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public boolean isOk() {
return ok;
}
public void setOk(boolean ok) {
this.ok = ok;
}
}
9.3. ExchangeDelete
请求 Request
响应 Response
package com.example.mq.common;
import java.io.Serializable;
/**
* 删除交换机的参数类
*/
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
private String exchangeName;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
}
9.4. QueueDelcare
请求 Request
响应 Response
package com.example.mq.common;
import java.io.Serializable;
import java.util.Map;
/**
* 创建队列的参数类
*/
public class QueueDeclareArguments extends BasicArguments implements Serializable {
private String queueName;
private boolean durable;
private boolean exclusive;
private boolean autoDelete;
private Map<String, Object> arguments;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isExclusive() {
return exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
public Map<String, Object> getArguments() {
return arguments;
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
9.5. QueueDelete
请求 Request
响应 Response
package com.example.mq.common;
import java.io.Serializable;
/**
* 删除队列的参数类
*/
public class QueueDeleteArguments extends BasicArguments implements Serializable {
private String queueName;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
}
9.6. QueueBind
请求 Request
响应 Response
package com.example.mq.common;
import java.io.Serializable;
/**
* 创建绑定的参数类
*/
public class QueueBindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
private String bindingKey;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getBindingKey() {
return bindingKey;
}
public void setBindingKey(String bindingKey) {
this.bindingKey = bindingKey;
}
}
9.7. QueueUnBind
请求 Request
响应 Response
package com.example.mq.common;
import java.io.Serializable;
/**
* 解除绑定的参数类
*/
public class QueueUnbindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
}
9.8. BasicPublish
请求 Request
响应 Response
package com.example.mq.common;
import com.example.mq.mqserver.core.BasicProperties;
import java.io.Serializable;
/**
* 发送消息到指定队列的参数类
*/
public class BasicPublishArguments extends BasicArguments implements Serializable {
private String exchangeName;
private String routingKey;
private BasicProperties basicProperties;
private byte[] body;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
9.9. BasicConsume
请求 Request
响应 Response
package com.example.mq.common;
import java.io.Serializable;
/**
* 订阅消息的参数类
*/
public class BasicConsumeArguments extends BasicArguments implements Serializable {
private String consumerTag;
private String queueName;
private boolean autoAck;
// 这个类对应的 basicConsume 方法中, 还有一个参数, 是回调函数用来来处理消息
// 这个回调函数, 是不能通过网络传输的.
// 而在 broker server 这边考虑实现, 针对消息的处理回调, 只是统一把消息返回给客户端即可.
// 而真正的回调处理事在客户端那边收到消息之后, 它自己会根据消息内容执行一个它自定义的回调.
// 此时, 客户端也就不需要把自身的回调告诉给服务器了.
// 这里就不需要 consumer 成员了.
public String getConsumerTag() {
return consumerTag;
}
public void setConsumerTag(String consumerTag) {
this.consumerTag = consumerTag;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public boolean isAutoAck() {
return autoAck;
}
public void setAutoAck(boolean autoAck) {
this.autoAck = autoAck;
}
}
9.10. BasicAck
请求 Request
响应 Response
package com.example.mq.common;
import java.io.Serializable;
public class BasicAckArguments extends BasicArguments implements Serializable {
private String queueName;
private String messageId;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
}
10. 创建BrokerServer类
消息队列本体服务器(本质上就是一个 TCP 的服务器)
清理过期和会话的逻辑
package com.example.mq.mqserver;
import com.example.mq.common.*;
import com.example.mq.mqserver.core.BasicProperties;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/*
* BrokerServer 是 消息队列 的 本体服务器.
* 本质上是一个 TCP 的服务器.
*/
public class BrokerServer {
private ServerSocket serverSocket = null;
// 当前的 BrokerServer 上只设计一个 虚拟主机
private VirtualHost virtualHost = new VirtualHost("default");
// 使用一个 哈希表 存储当前的所有会话(哪些客户端正在和服务器进行通信)
// key 是 channelId, value 为对应的 Socket 对象
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();
// 引入一个线程池, 来处理多个客户端的请求.
private ExecutorService executorService = null;
// 引入一个 boolean 变量控制服务器是否继续运行
private volatile boolean runnable = true;
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
public void start() throws IOException {
System.out.println("[BrokerServer] 启动!");
executorService = Executors.newCachedThreadPool();
try {
while (runnable) {
Socket clientSocket = serverSocket.accept();
// 把处理连接的逻辑交给线程池.
executorService.submit(() -> {
processConnection(clientSocket);
});
}
} catch (SocketException e) {
System.out.println("[BrokerServer] 服务器停止运行!");
// e.printStackTrace();
}
}
// 停止服务器可以就是直接 kill 掉对应进程.
// 这里单独的停止方法. 主要是用于后续的单元测试.
public void stop() throws IOException {
runnable = false;
// 把线程池中的任务都放弃了. 让线程都销毁.
executorService.shutdownNow();
serverSocket.close();
}
// 通过这个方法, 来处理一个客户端的连接.
// 在这一个连接中, 可能会涉及到多个请求和响应.
private void processConnection(Socket clientSocket) {
try (InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()) {
// 这里需要按照特定格式来读取并解析. 需要用到 DataInputStream 和 DataOutputStream
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
while (true) {
// 1. 读取请求并解析.
Request request = readRequest(dataInputStream);
// 2. 根据请求计算响应
Response response = process(request, clientSocket);
// 3. 把响应写回给客户端
writeResponse(dataOutputStream, response);
}
}
} catch (EOFException | SocketException e) {
// 这个代码, DataInputStream 读到 EOF , 会抛出一个 EOFException 异常.
// 这里借助这个异常来结束循环
System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()
+ ":" + clientSocket.getPort());
} catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[BrokerServer] connection 出现异常!");
e.printStackTrace();
} finally {
try {
// 当连接处理完了, 就需要记得关闭 socket
clientSocket.close();
// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.
clearClosedSession(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 读取一个请求解析
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if (n != request.getLength()) {
throw new IOException("读取请求格式出错!");
}
request.setPayload(payload);
return request;
}
// 将响应写回给客户端
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
// 刷新缓冲区
dataOutputStream.flush();
}
// 根据请求计算响应
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1. 把 request 中的 payload 做一个初步的解析.
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
+ ", type=" + request.getType() + ", length=" + request.getLength());
// 2. 根据 type 的值区分每次请求之后的逻辑.
boolean ok = true;
if (request.getType() == 0x1) {
// 创建 channel
sessions.put(basicArguments.getChannelId(), clientSocket);
System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x2) {
// 销毁 channel
sessions.remove(basicArguments.getChannelId());
System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x3) {
// 创建交换机. 此时 payload 是 ExchangeDeclareArguments 对象.
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());
} else if (request.getType() == 0x4) {
// 删除交换机
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
} else if (request.getType() == 0x5) {
// 创建队列
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),
arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());
} else if (request.getType() == 0x6) {
// 删除队列
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete((arguments.getQueueName()));
} else if (request.getType() == 0x7) {
// 绑定交换机和队列
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
} else if (request.getType() == 0x8) {
// 删除绑定
QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
} else if (request.getType() == 0x9) {
// 发布消息
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
arguments.getBasicProperties(), arguments.getBody());
} else if (request.getType() == 0xa) {
// 订阅消息
BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
new Consumer() {
// 这里回调函数是把服务器收到的消息直接推送给对应的消费者客户端
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
// 首先要获取到当前这个收到的消息, 要发给哪个客户端.
// consumerTag 可以设置为 channelId. 根据 channelId 去 sessions 中查询, 可以得到对应的
// socket 对象, 往里面发送数据
// 1. 根据 channelId 找到 socket 对象
Socket clientSocket = sessions.get(consumerTag);
if (clientSocket == null || clientSocket.isClosed()) {
throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
}
// 2. 构造响应数据
Response response = new Response();
// 0xc 表示服务器给消费者客户端推送的消息数据.
response.setType(0xc);
// response 的 payload 是一个 SubScribeReturns
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setChannelId(consumerTag);
subScribeReturns.setRid(""); // 推送这里只有响应, 没有请求, rid 暂时不需要.
subScribeReturns.setOk(true);
subScribeReturns.setConsumerTag(consumerTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toBytes(subScribeReturns);
response.setLength(payload.length);
response.setPayload(payload);
// 3. 把数据写回给客户端.
// 此处的 dataOutputStream 这个对象不能 close
// 如果 把 dataOutputStream 关闭, 会直接把 clientSocket 里的 outputStream 也关了.
// 此时就无法继续往 socket 中写入后续数据了.
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream, response);
}
});
} else if (request.getType() == 0xb) {
// 调用 basicAck 确认消息.
BasicAckArguments arguments = (BasicAckArguments) basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
} else {
// 当前的 type 是非法的.
throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
}
// 3. 构造响应
Response response = new Response();
response.setType(request.getType());
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
+ ", type=" + response.getType() + ", length=" + response.getLength());
return response;
}
private void clearClosedSession(Socket clientSocket) {
// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.
List<String> toDeleteChannelId = new ArrayList<>();
for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
if (entry.getValue() == clientSocket) {
// 不能针对同一个集合类 一边遍历, 一边删除, 会破坏集合结果而影响迭代器
// sessions.remove(entry.getKey());
// 这里用一个 list, 先把要删除的键值对, 放到 list 中, 后续再删除.
toDeleteChannelId.add(entry.getKey());
}
}
for (String channelId : toDeleteChannelId) {
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
}
}
package com.example.mq.common;
import com.example.mq.mqserver.core.BasicProperties;
import java.io.Serializable;
/**
* broker server 服务器给消费者客户端返回的消息类
*/
public class SubScribeReturns extends BasicReturns implements Serializable {
private String consumerTag;
private BasicProperties basicProperties;
private byte[] body;
public String getConsumerTag() {
return consumerTag;
}
public void setConsumerTag(String consumerTag) {
this.consumerTag = consumerTag;
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
11. 客户端代码
🍂ConnectionFactory 连接工厂
这个类持有服务器的地址
主要的功能就是:创建出连接 Connection 对象
package com.example.mq.mqclient;
import java.io.IOException;
public class ConnectionFactory {
// broker server 的 ip 地址
private String host;
// broker server 的端口号
private int port;
// 访问 broker server 的哪个虚拟主机(当前只设置了一个虚拟主机, 后续拓展).
// private String virtualHostName;
// private String username;
// private String password;
public Connection newConnection() throws IOException {
Connection connection = new Connection(host, port);
return connection;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
🍂Connection 表示一个TCP连接
- 持有 Socket 对象
- 发送请求
- 读取响应(创建一个扫描线程,由这个线程负责不停地从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 进行处理)
- 如果 response.type == 0xc,则是服务器推送的消息
- 利用 SubScribeReturns 来接收
- 根据 channelId 找到相应的 channel对象
- 利用线程池执行 channel 里面的回调函数
- 如果是 response.type != 0xc,则当前响应是针对控制请求的响应
- 利用 BasicReturns 来接收
- 根据 BasicReturns 对象中的 channelId 在 channelMap中找到 channel对象
- 并将 BasicReturns 存到 channel对象中的 basicReturnsMap 哈希表中
- 创建一个 channel
- 随机生成 C+UUID
- 将当前对象存放到 Connection 管理 channel 的哈希表中
- 然后将 这个命令 通过 connection 发送给 服务器
- 管理多个 channel 对象
- ConcurrentHashMap<String,Channel> channelMap
- 每次创建一个 channel的时候,就存进去
🍂channel 表示一个逻辑上的连接
一个客户端可以有多个模块。
每个模块都可以和 brokerserver 之间建立”逻辑上的连接“ (channel)
这几个模块的 channel 彼此之间是相互不影响的
但是这几个 channel 复用了同一个 TCP 连接(RabbitMQ 的客户端就是这么设定的)
还需要提供一系列的方法,去和服务器提供的核心API对应
(客户端提供的方法,方法的内部,就是发了一个特定的请求)
对于一个客户端的一次 Connection下,可能会有多个 channel,就是多个逻辑上的连接,那么如何区分响应?
例如有 channelA 和 channelB 。channelA 发送的请求 A,channelB 发送的请求 B。此时响应的顺序不会按照顺序返回,而且 channelA 也不用关心其他响应,只关心是否收到响应 A。
所以此时需要在 channel 下用一个 basicReturns 来存储当前 channle 的收到服务器的响应。当客户端 connetion 读取到响应时候,添加到 channel 中 basicReturns
Channel 实现代码:
package com.example.mq.mqclient;
import com.example.mq.common.*;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;
import java.io.IOException;
import java.lang.ref.ReferenceQueue;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class Channel {
private String channelId;
// 当前这个 channel 属于哪个连接.
private Connection connection;
// 用来存储后续客户端收到的服务器的响应.
private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
// 如果当前 Channel 订阅了某个队列, 需要在此处记录下对应回调处理. 队列的消息推送过来就调用回调.
// 这里约定一个 Channel 中只能有一个回调.
private Consumer consumer = null;
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {
return basicReturnsMap;
}
public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {
this.basicReturnsMap = basicReturnsMap;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public Channel(String channelId, Connection connection) {
this.channelId = channelId;
this.connection = connection;
}
// 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.
public boolean createChannel() throws IOException {
Request request = new Request();
request.setType(0x1);
// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象
BasicArguments basicArguments = new BasicArguments();
basicArguments.setChannelId(channelId);
basicArguments.setRid(generateRid());
byte[] payload = BinaryTool.toBytes(basicArguments);
request.setLength(payload.length);
request.setPayload(payload);
// 发送请求了.
connection.writeRequest(request);
// 等待服务器的响应
BasicReturns basicReturns = waitResult(basicArguments.getRid());
return basicReturns.isOk();
}
private String generateRid() {
return "R-" + UUID.randomUUID().toString();
}
// 使用这个方法来阻塞等待服务器的响应.
private BasicReturns waitResult(String rid) {
BasicReturns basicReturns = null;
while ((basicReturns = basicReturnsMap.get(rid)) == null) {
// 查询结果为 null, 响应还未返回.
// 此时就需要阻塞等待.
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 读取成功之后, 要把这个消息从哈希表中删除掉.
basicReturnsMap.remove(rid);
return basicReturns;
}
public void putReturns(BasicReturns basicReturns) {
basicReturnsMap.put(basicReturns.getRid(), basicReturns);
synchronized (this) {
// 把所有的等待的线程都唤醒.
notifyAll();
}
}
// 关闭 channel, 给服务器发送一个 type = 0x2 的请求
public boolean close() throws IOException {
Request request = new Request();
request.setType(0x2);
BasicArguments basicArguments = new BasicArguments();
basicArguments.setRid(generateRid());
basicArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(basicArguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicArguments.getRid());
return basicReturns.isOk();
}
// 创建交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException {
Request request = new Request();
request.setType(0x3);
ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
exchangeDeclareArguments.setRid(generateRid());
exchangeDeclareArguments.setChannelId(channelId);
exchangeDeclareArguments.setExchangeName(exchangeName);
exchangeDeclareArguments.setExchangeType(exchangeType);
exchangeDeclareArguments.setDurable(durable);
exchangeDeclareArguments.setAutoDelete(autoDelete);
exchangeDeclareArguments.setArguments(arguments);
byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
return basicReturns.isOk();
}
// 删除交换机
public boolean exchangeDelete(String exchangeName) throws IOException {
Request request = new Request();
request.setType(0x4);
ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toBytes(arguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
// 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException {
Request request = new Request();
request.setType(0x5);
QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
queueDeclareArguments.setRid(generateRid());
queueDeclareArguments.setChannelId(channelId);
queueDeclareArguments.setQueueName(queueName);
queueDeclareArguments.setDurable(durable);
queueDeclareArguments.setExclusive(exclusive);
queueDeclareArguments.setAutoDelete(autoDelete);
queueDeclareArguments.setArguments(arguments);
byte[] payload = BinaryTool.toBytes(queueDeclareArguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
return basicReturns.isOk();
}
// 删除队列
public boolean queueDelete(String queueName) throws IOException {
Request request = new Request();
request.setType(0x6);
QueueDeleteArguments arguments = new QueueDeleteArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
byte[] payload = BinaryTool.toBytes(arguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
// 创建绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
Request request = new Request();
request.setType(0x7);
QueueBindArguments arguments = new QueueBindArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setExchangeName(exchangeName);
arguments.setBindingKey(bindingKey);
byte[] payload = BinaryTool.toBytes(arguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
// 解除绑定
public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
Request request = new Request();
request.setType(0x8);
QueueUnbindArguments arguments = new QueueUnbindArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toBytes(arguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
// 发送消息
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
Request request = new Request();
request.setType(0x9);
BasicPublishArguments arguments = new BasicPublishArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setExchangeName(exchangeName);
arguments.setRoutingKey(routingKey);
arguments.setBasicProperties(basicProperties);
arguments.setBody(body);
byte[] payload = BinaryTool.toBytes(arguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
// 订阅消息
public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
// 先设置回调.
if (this.consumer != null) {
throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
}
this.consumer = consumer;
Request request = new Request();
request.setType(0xa);
BasicConsumeArguments arguments = new BasicConsumeArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来表示了.
arguments.setQueueName(queueName);
arguments.setAutoAck(autoAck);
byte[] payload = BinaryTool.toBytes(arguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
// 确认消息
public boolean basicAck(String queueName, String messageId) throws IOException {
Request request = new Request();
request.setType(0xb);
BasicAckArguments arguments = new BasicAckArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setMessageId(messageId);
byte[] payload = BinaryTool.toBytes(arguments);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
}
Connection 实现代码:
package com.example.mq.mqclient;
import com.example.mq.common.*;
import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Connection {
private Socket socket = null;
// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.
private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
private ExecutorService callbackPool = null;
public Connection(String host, int port) throws IOException {
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
dataInputStream = new DataInputStream(inputStream);
dataOutputStream = new DataOutputStream(outputStream);
callbackPool = Executors.newFixedThreadPool(4);
// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.
Thread t = new Thread(() -> {
try {
while (!socket.isClosed()) {
Response response = readResponse();
dispatchResponse(response);
}
} catch (SocketException e) {
// 连接正常断开的. 此时这个异常直接忽略.
System.out.println("[Connection] 连接正常断开!");
} catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[Connection] 连接异常断开!");
e.printStackTrace();
}
});
t.start();
}
// 关闭 Connection 释放上述资源
public void close() {
try {
callbackPool.shutdownNow();
channelMap.clear();
inputStream.close();
outputStream.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 使用这个方法来分别处理 针对控制请求的响应 和 服务器推送的消息.
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
if (response.getType() == 0xc) {
// 服务器推送来的消息数据
SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
// 根据 channelId 找到对应的 channel 对象
Channel channel = channelMap.get(subScribeReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
}
// 执行该 channel 对象内部的回调.
callbackPool.submit(() -> {
try {
channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
subScribeReturns.getBody());
} catch (MqException | IOException e) {
e.printStackTrace();
}
});
} else {
// 当前响应是针对刚才的控制请求的响应
BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
// 把这个结果放到对应的 channel 的 hash 表中.
Channel channel = channelMap.get(basicReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
}
channel.putReturns(basicReturns);
}
}
// 发送请求
public void writeRequest(Request request) throws IOException {
dataOutputStream.writeInt(request.getType());
dataOutputStream.writeInt(request.getLength());
dataOutputStream.write(request.getPayload());
dataOutputStream.flush();
System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());
}
// 读取响应
public Response readResponse() throws IOException {
Response response = new Response();
response.setType(dataInputStream.readInt());
response.setLength(dataInputStream.readInt());
byte[] payload = new byte[response.getLength()];
int n = dataInputStream.read(payload);
if (n != response.getLength()) {
throw new IOException("读取的响应数据不完整!");
}
response.setPayload(payload);
System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());
return response;
}
// 通过这个方法, 在 Connection 中能够创建出一个 Channel
public Channel createChannel() throws IOException {
String channelId = "C-" + UUID.randomUUID().toString();
Channel channel = new Channel(channelId, this);
// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.
channelMap.put(channelId, channel);
// 同时需要把 "创建 channel" 的这个消息也告诉服务器.
boolean ok = channel.createChannel();
if (!ok) {
// 服务器这里创建失败了 就表示整个这次创建 channel 操作不顺利
// 就把刚才已经加入 hash 表的键值对删除.
channelMap.remove(channelId);
return null;
}
return channel;
}
}
12. 使用demo及项目总结
以上完成了项目的基础代码,就是写了一个消费者队列服务器。
核心功能就是提供了 虚拟主机、交换机、队列、消息等概念的管理;实现了三种典型的消息转发方式;基于这些内容就可以实现 跨主机 / 服务器 之间的生产者消费者模型了。
生产者 demo 代码:
package com.example.mq.demo;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;
import java.io.IOException;
/*
* 这个类用来表示一个生产者.
* 正常是一个单独的服务器程序.
*/
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("启动生产者");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机和队列
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
channel.queueDeclare("testQueue", true, false, false, null);
// 创建一个消息并发送
byte[] body = "hello".getBytes();
boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
System.out.println("消息投递完成! ok=" + ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
消费者 demo 代码:
package com.example.mq.demo;
import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;
import java.io.IOException;
/*
* 这个类表示一个消费者.
* 正常由一个单独的服务器程序执行.
*/
public class DemoConsumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
channel.queueDeclare("testQueue", true, false, false, null);
channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag=" + consumerTag);
System.out.println("basicProperties=" + basicProperties);
String bodyString = new String(body, 0, body.length);
System.out.println("body=" + bodyString);
System.out.println("[消费数据] 结束!");
}
});
// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
while (true) {
Thread.sleep(500);
}
}
}
🍂项目可扩展点:
- 虚拟主机的管理(建立虚拟主机表)
- 用户管理 / 用户认证(建立用户表,可在 建立连接的时候 或者 建立 channel 的时候)
- 交换机 / 队列,实现 独占 / 自动删除 / 扩展参数 …
- 发送方确认(服务器返回响应,生产者收到后触发回调)
- 拒绝 / 否定 应答
- 死信队列(针对消息可靠性)
- 管理接口 & 管理页面
更多推荐
所有评论(0)