MQTT的java实现org.eclipse.paho.client.mqttv3 源码分析(一)
org.eclipse.paho.client.mqttv3.internal.wire :MQTT协议中报文信息,里面包含有心跳包、订阅包、发布包、确认包等。1、MqttWireMessage源码package com.miller.springcloudkebeedgeprotocol.coap;/*** @program: spri...
·
org.eclipse.paho.client.mqttv3.internal.wire :
MQTT协议中报文信息,里面包含有心跳包、订阅包、发布包、确认包等。
1、MqttWireMessage源码
package com.miller.springcloudkebeedgeprotocol.coap;
/**
* @program: spring-cloud-kubeedge
* @description: ymfx
* @author: Miller.FAN
* @create: 2019-12-10 16:16
**/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistable;
import org.eclipse.paho.client.mqttv3.MqttToken;
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
import org.eclipse.paho.client.mqttv3.internal.wire.*;
public abstract class MqttWireMessage {
//MQTT数据包类型定义
public static final byte MESSAGE_TYPE_CONNECT = 1;
public static final byte MESSAGE_TYPE_CONNACK = 2;
public static final byte MESSAGE_TYPE_PUBLISH = 3;
public static final byte MESSAGE_TYPE_PUBACK = 4;
public static final byte MESSAGE_TYPE_PUBREC = 5;
public static final byte MESSAGE_TYPE_PUBREL = 6;
public static final byte MESSAGE_TYPE_PUBCOMP = 7;
public static final byte MESSAGE_TYPE_SUBSCRIBE = 8;
public static final byte MESSAGE_TYPE_SUBACK = 9;
public static final byte MESSAGE_TYPE_UNSUBSCRIBE = 10;
public static final byte MESSAGE_TYPE_UNSUBACK = 11;
public static final byte MESSAGE_TYPE_PINGREQ = 12;
public static final byte MESSAGE_TYPE_PINGRESP = 13;
public static final byte MESSAGE_TYPE_DISCONNECT = 14;
protected static final Charset STRING_ENCODING;
private static final String[] PACKET_NAMES;
private static final long FOUR_BYTE_INT_MAX = 4294967295L;
private static final int VARIABLE_BYTE_INT_MAX = 268435455;
private byte type;
protected int msgId;
protected boolean duplicate = false;
private MqttToken token;
//静态块 加载类之前执行,只执行一次 我理解这部分代码执行完毕会在常量池中生成相关的常量信息
static {
STRING_ENCODING = StandardCharsets.UTF_8;
PACKET_NAMES = new String[]{"reserved", "CONNECT", "0", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT"};
}
//构造器 参数是:消息类型
public MqttWireMessage(byte type) {
this.type = type;
this.msgId = 0;
}
//抽象方法 获取信息详情
protected abstract byte getMessageInfo();
// 获取消息体
public byte[] getPayload() throws MqttException {
return new byte[0];
}
//获取消息类型
public byte getType() {
return this.type;
}
//获取消息的id
public int getMessageId() {
return this.msgId;
}
//
public void setMessageId(int msgId) {
this.msgId = msgId;
}
//
public String getKey() {
return Integer.toString(this.getMessageId());
}
//获取 头信息 抽象方法
public byte[] getHeader() throws MqttException {
try {
int first = (this.getType() & 15) << 4 ^ this.getMessageInfo() & 15;
byte[] varHeader = this.getVariableHeader();
int remLen = varHeader.length + this.getPayload().length;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeByte(first);
dos.write(encodeMBI((long)remLen));
dos.write(varHeader);
dos.flush();
return baos.toByteArray();
} catch (IOException var6) {
throw new MqttException(var6);
}
}
//抽象方法
protected abstract byte[] getVariableHeader() throws MqttException;
public boolean isMessageIdRequired() {
return true;
}
//新建一个消息 如果传入的消息体是空,给一个new byte[0];
public static org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage createWireMessage(MqttPersistable data) throws MqttException {
byte[] payload = data.getPayloadBytes();
if (payload == null) {
//防止 null
payload = new byte[0];
}
MultiByteArrayInputStream mbais = new MultiByteArrayInputStream(data.getHeaderBytes(), data.getHeaderOffset(), data.getHeaderLength(), payload, data.getPayloadOffset(), data.getPayloadLength());
return createWireMessage((InputStream)mbais);
}
public static org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage createWireMessage(byte[] bytes) throws MqttException {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
return createWireMessage((InputStream)bais);
}
//这个是私有函数体
private static org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage createWireMessage(InputStream inputStream) throws MqttException {
try {
//统计输入流中字节数
CountingInputStream counter = new CountingInputStream(inputStream);
//
DataInputStream in = new DataInputStream(counter);
//the next byte of this input stream, interpreted as an unsigned 8-bit number.
int first = in.readUnsignedByte();
//取得消息类型
byte type = (byte)(first >> 4);
//我觉得这是编程习惯,严谨一些,不执行这一步一样可以
byte info = (byte)(first &= 15);
long remLen = (long)readMBI(in).getValue();
long totalToRead = (long)counter.getCounter() + remLen;
//剩余多大空间
long remainder = totalToRead - (long)counter.getCounter();
byte[] data = new byte[0];
if (remainder > 0L) {
data = new byte[(int)remainder];
in.readFully(data, 0, data.length);
}
//按照消息类型 组合消息头和消息体 生成的包不一样
Object result;
if (type == 1) {
result = new MqttConnect(info, data);
} else if (type == 3) {
result = new MqttPublish(info, data);
} else if (type == 4) {
result = new MqttPubAck(info, data);
} else if (type == 7) {
result = new MqttPubComp(info, data);
} else if (type == 2) {
result = new MqttConnack(info, data);
} else if (type == 12) {
result = new MqttPingReq(info, data);
} else if (type == 13) {
result = new MqttPingResp(info, data);
} else if (type == 8) {
result = new MqttSubscribe(info, data);
} else if (type == 9) {
result = new MqttSuback(info, data);
} else if (type == 10) {
result = new MqttUnsubscribe(info, data);
} else if (type == 11) {
result = new MqttUnsubAck(info, data);
} else if (type == 6) {
result = new MqttPubRel(info, data);
} else if (type == 5) {
result = new MqttPubRec(info, data);
} else {
if (type != 14) {
throw ExceptionHelper.createMqttException(6);
}
// type = 14 的情况
result = new MqttDisconnect(info, data);
}
return (org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage)result;
} catch (IOException var14) {
throw new MqttException(var14);
}
}
// 输入一个long 类型的数,返回一个byte数组
public static byte[] encodeMBI(long number) {
// 检查 number 是不是在规定的范围之内
validateVariableByteInt((int)number);
//局部变量
int numBytes = 0;
long no = number;
//字节码输出流
ByteArrayOutputStream bos = new ByteArrayOutputStream();
do {
byte digit = (byte)((int)(no % 128L));
no /= 128L;
if (no > 0L) {
digit = (byte)(digit | 128);
}
bos.write(digit);
++numBytes;
} while(no > 0L && numBytes < 4);
return bos.toByteArray();
}
//类型转化
public static MultiByteInteger readMBI(DataInputStream in) throws IOException {
int msgLength = 0;
int multiplier = 1;
int count = 0;
byte digit;
do {
digit = in.readByte();
++count;
msgLength += (digit & 127) * multiplier;
multiplier *= 128;
} while((digit & 128) != 0);
if (msgLength >= 0 && msgLength <= 268435455) {
return new MultiByteInteger(msgLength, count);
} else {
throw new IOException("This property must be a number between 0 and 268435455. Read value was: " + msgLength);
}
}
protected byte[] encodeMessageId() throws MqttException {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeShort(this.msgId);
dos.flush();
return baos.toByteArray();
} catch (IOException var3) {
throw new MqttException(var3);
}
}
public boolean isRetryable() {
return false;
}
public void setDuplicate(boolean duplicate) {
this.duplicate = duplicate;
}
public static void encodeUTF8(DataOutputStream dos, String stringToEncode) throws MqttException {
validateUTF8String(stringToEncode);
try {
byte[] encodedString = stringToEncode.getBytes(STRING_ENCODING);
byte byte1 = (byte)(encodedString.length >>> 8 & 255);
byte byte2 = (byte)(encodedString.length >>> 0 & 255);
dos.write(byte1);
dos.write(byte2);
dos.write(encodedString);
} catch (UnsupportedEncodingException var5) {
throw new MqttException(var5);
} catch (IOException var6) {
throw new MqttException(var6);
}
}
public static String decodeUTF8(DataInputStream input) throws MqttException {
try {
int encodedLength = input.readUnsignedShort();
byte[] encodedString = new byte[encodedLength];
input.readFully(encodedString);
String output = new String(encodedString, STRING_ENCODING);
validateUTF8String(output);
return output;
} catch (IOException var4) {
throw new MqttException(var4);
}
}
private static void validateUTF8String(String input) throws IllegalArgumentException {
for(int i = 0; i < input.length(); ++i) {
boolean isBad = false;
char c = input.charAt(i);
if (Character.isHighSurrogate(c)) {
++i;
if (i == input.length()) {
isBad = true;
} else {
char c2 = input.charAt(i);
if (Character.isLowSurrogate(c2)) {
isBad = true;
} else {
int ch = (c & 1023) << 10 | c2 & 1023;
if ((ch & '\uffff') == 65535 || (ch & '\uffff') == 65534) {
isBad = true;
}
}
}
} else if (!Character.isISOControl(c) && !Character.isLowSurrogate(c)) {
if (c >= '\ufdd0' && (c == '\ufffe' || c >= '\ufdd0' || c <= '\ufddf')) {
isBad = true;
}
} else {
isBad = true;
}
if (isBad) {
throw new IllegalArgumentException(String.format("Invalid UTF-8 char: [%x]", Integer.valueOf(c)));
}
}
}
// 检查int value 是不是在规定的范围之内
public static void validateVariableByteInt(int value) throws IllegalArgumentException {
if (value < 0 || value > 268435455) {
throw new IllegalArgumentException("This property must be a number between 0 and 268435455");
}
}
public MqttToken getToken() {
return this.token;
}
public void setToken(MqttToken token) {
this.token = token;
}
public String toString() {
return PACKET_NAMES[this.type];
}
}
消息流:
Token类的实现
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttAck;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttConnack;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttSuback;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
public class Token {
//如此写法有什么好处? final 修饰,常量,类加载后 CLASS_NAME 被保存到常量池中 ,但是该处使用反射得到类名 主动加载
private static final String CLASS_NAME = org.eclipse.paho.client.mqttv3.internal.Token.class.getName();
private Logger log;
private volatile boolean completed;
private boolean pendingComplete;
private boolean sent;
private Object responseLock;
private Object sentLock;
protected MqttMessage message;
private MqttWireMessage response;
private MqttException exception;
private String[] topics;
private String key;
private IMqttAsyncClient client;
private IMqttActionListener callback;
private Object userContext;
private int messageID;
private boolean notified;
public Token(String logContext) {
this.log = LoggerFactory.getLogger("org.eclipse.paho.client.mqttv3.internal.nls.logcat", CLASS_NAME);
this.completed = false;
this.pendingComplete = false;
this.sent = false;
this.responseLock = new Object();
this.sentLock = new Object();
this.message = null;
this.response = null;
this.exception = null;
this.topics = null;
this.client = null;
this.callback = null;
this.userContext = null;
this.messageID = 0;
this.notified = false;
this.log.setResourceName(logContext);
}
public int getMessageID() {
return this.messageID;
}
public void setMessageID(int messageID) {
this.messageID = messageID;
}
public boolean checkResult() throws MqttException {
if (this.getException() != null) {
throw this.getException();
} else {
return true;
}
}
public MqttException getException() {
return this.exception;
}
public boolean isComplete() {
return this.completed;
}
protected boolean isCompletePending() {
return this.pendingComplete;
}
protected boolean isInUse() {
return this.getClient() != null && !this.isComplete();
}
public void setActionCallback(IMqttActionListener listener) {
this.callback = listener;
}
public IMqttActionListener getActionCallback() {
return this.callback;
}
public void waitForCompletion() throws MqttException {
this.waitForCompletion(-1L);
}
public void waitForCompletion(long timeout) throws MqttException {
String methodName = "waitForCompletion";
this.log.fine(CLASS_NAME, "waitForCompletion", "407", new Object[]{this.getKey(), timeout, this});
MqttWireMessage resp = this.waitForResponse(timeout);
if (resp == null && !this.completed) {
this.log.fine(CLASS_NAME, "waitForCompletion", "406", new Object[]{this.getKey(), this});
this.exception = new MqttException(32000);
throw this.exception;
} else {
this.checkResult();
}
}
protected MqttWireMessage waitForResponse() throws MqttException {
return this.waitForResponse(-1L);
}
protected MqttWireMessage waitForResponse(long timeout) throws MqttException {
String methodName = "waitForResponse";
synchronized(this.responseLock) {
this.log.fine(CLASS_NAME, "waitForResponse", "400", new Object[]{this.getKey(), timeout, this.sent, this.completed, this.exception == null ? "false" : "true", this.response, this}, this.exception);
while(!this.completed) {
if (this.exception == null) {
try {
this.log.fine(CLASS_NAME, "waitForResponse", "408", new Object[]{this.getKey(), new Long(timeout)});
if (timeout <= 0L) {
this.responseLock.wait();
} else {
this.responseLock.wait(timeout);
}
} catch (InterruptedException var6) {
this.exception = new MqttException(var6);
}
}
if (!this.completed) {
if (this.exception != null) {
this.log.fine(CLASS_NAME, "waitForResponse", "401", (Object[])null, this.exception);
throw this.exception;
}
if (timeout > 0L) {
break;
}
}
}
}
this.log.fine(CLASS_NAME, "waitForResponse", "402", new Object[]{this.getKey(), this.response});
return this.response;
}
protected void markComplete(MqttWireMessage msg, MqttException ex) {
String methodName = "markComplete";
this.log.fine(CLASS_NAME, "markComplete", "404", new Object[]{this.getKey(), msg, ex});
synchronized(this.responseLock) {
if (msg instanceof MqttAck) {
this.message = null;
}
this.pendingComplete = true;
this.response = msg;
this.exception = ex;
}
}
protected void notifyComplete() {
String methodName = "notifyComplete";
this.log.fine(CLASS_NAME, "notifyComplete", "404", new Object[]{this.getKey(), this.response, this.exception});
synchronized(this.responseLock) {
if (this.exception == null && this.pendingComplete) {
this.completed = true;
this.pendingComplete = false;
} else {
this.pendingComplete = false;
}
this.responseLock.notifyAll();
}
synchronized(this.sentLock) {
this.sent = true;
this.sentLock.notifyAll();
}
}
public void waitUntilSent() throws MqttException {
String methodName = "waitUntilSent";
synchronized(this.sentLock) {
synchronized(this.responseLock) {
if (this.exception != null) {
throw this.exception;
}
}
while(!this.sent) {
try {
this.log.fine(CLASS_NAME, "waitUntilSent", "409", new Object[]{this.getKey()});
this.sentLock.wait();
} catch (InterruptedException var4) {
}
}
if (!this.sent) {
if (this.exception == null) {
throw ExceptionHelper.createMqttException(6);
} else {
throw this.exception;
}
}
}
}
protected void notifySent() {
String methodName = "notifySent";
this.log.fine(CLASS_NAME, "notifySent", "403", new Object[]{this.getKey()});
synchronized(this.responseLock) {
this.response = null;
this.completed = false;
}
synchronized(this.sentLock) {
this.sent = true;
this.sentLock.notifyAll();
}
}
public IMqttAsyncClient getClient() {
return this.client;
}
protected void setClient(IMqttAsyncClient client) {
this.client = client;
}
public void reset() throws MqttException {
String methodName = "reset";
if (this.isInUse()) {
throw new MqttException(32201);
} else {
this.log.fine(CLASS_NAME, "reset", "410", new Object[]{this.getKey()});
this.client = null;
this.completed = false;
this.response = null;
this.sent = false;
this.exception = null;
this.userContext = null;
}
}
public MqttMessage getMessage() {
return this.message;
}
public MqttWireMessage getWireMessage() {
return this.response;
}
public void setMessage(MqttMessage msg) {
this.message = msg;
}
public String[] getTopics() {
return this.topics;
}
public void setTopics(String[] topics) {
this.topics = (String[])topics.clone();
}
public Object getUserContext() {
return this.userContext;
}
public void setUserContext(Object userContext) {
this.userContext = userContext;
}
public void setKey(String key) {
this.key = key;
}
public String getKey() {
return this.key;
}
public void setException(MqttException exception) {
synchronized(this.responseLock) {
this.exception = exception;
}
}
public boolean isNotified() {
return this.notified;
}
public void setNotified(boolean notified) {
this.notified = notified;
}
public String toString() {
StringBuffer tok = new StringBuffer();
tok.append("key=").append(this.getKey());
tok.append(" ,topics=");
if (this.getTopics() != null) {
for(int i = 0; i < this.getTopics().length; ++i) {
tok.append(this.getTopics()[i]).append(", ");
}
}
tok.append(" ,usercontext=").append(this.getUserContext());
tok.append(" ,isComplete=").append(this.isComplete());
tok.append(" ,isNotified=").append(this.isNotified());
tok.append(" ,exception=").append(this.getException());
tok.append(" ,actioncallback=").append(this.getActionCallback());
return tok.toString();
}
public int[] getGrantedQos() {
int[] val = new int[0];
if (this.response instanceof MqttSuback) {
val = ((MqttSuback)this.response).getGrantedQos();
}
return val;
}
public boolean getSessionPresent() {
boolean val = false;
if (this.response instanceof MqttConnack) {
val = ((MqttConnack)this.response).getSessionPresent();
}
return val;
}
public MqttWireMessage getResponse() {
return this.response;
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)