MQTT的java实现org.eclipse.paho.client.mqttv3 源码分析(一)
org.eclipse.paho.client.mqttv3.internal.wire :
package com.miller.springcloudkebeedgeprotocol.coap;
* @program: spring-cloud-kubeedge
* @description: ymfx
* @author: Miller.FAN
* @create: 2019-12-10 16:16
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistable;
import org.eclipse.paho.client.mqttv3.MqttToken;
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
import org.eclipse.paho.client.mqttv3.internal.wire.*;
public abstract class MqttWireMessage {
public static final byte MESSAGE_TYPE_CONNECT = 1;
public static final byte MESSAGE_TYPE_CONNACK = 2;
public static final byte MESSAGE_TYPE_PUBLISH = 3;
public static final byte MESSAGE_TYPE_PUBACK = 4;
public static final byte MESSAGE_TYPE_PUBREC = 5;
public static final byte MESSAGE_TYPE_PUBREL = 6;
public static final byte MESSAGE_TYPE_PUBCOMP = 7;
public static final byte MESSAGE_TYPE_SUBSCRIBE = 8;
public static final byte MESSAGE_TYPE_SUBACK = 9;
public static final byte MESSAGE_TYPE_UNSUBSCRIBE = 10;
public static final byte MESSAGE_TYPE_UNSUBACK = 11;
public static final byte MESSAGE_TYPE_PINGREQ = 12;
public static final byte MESSAGE_TYPE_PINGRESP = 13;
public static final byte MESSAGE_TYPE_DISCONNECT = 14;
protected static final Charset STRING_ENCODING;
private static final String[] PACKET_NAMES;
private static final long FOUR_BYTE_INT_MAX = 4294967295L;
private static final int VARIABLE_BYTE_INT_MAX = 268435455;
private byte type;
protected int msgId;
protected boolean duplicate = false;
private MqttToken token;
//静态块 加载类之前执行,只执行一次 我理解这部分代码执行完毕会在常量池中生成相关的常量信息
static {
STRING_ENCODING = StandardCharsets.UTF_8;
//构造器 参数是:消息类型
public MqttWireMessage(byte type) {
this.type = type;
this.msgId = 0;
//抽象方法 获取信息详情
protected abstract byte getMessageInfo();
// 获取消息体
public byte[] getPayload() throws MqttException {
return new byte[0];
public byte getType() {
return this.type;
public int getMessageId() {
return this.msgId;
public void setMessageId(int msgId) {
this.msgId = msgId;
public String getKey() {
return Integer.toString(this.getMessageId());
//获取 头信息 抽象方法
public byte[] getHeader() throws MqttException {
try {
int first = (this.getType() & 15) << 4 ^ this.getMessageInfo() & 15;
byte[] varHeader = this.getVariableHeader();
int remLen = varHeader.length + this.getPayload().length;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
return baos.toByteArray();
} catch (IOException var6) {
throw new MqttException(var6);
protected abstract byte[] getVariableHeader() throws MqttException;
public boolean isMessageIdRequired() {
return true;
//新建一个消息 如果传入的消息体是空,给一个new byte[0];
public static org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage createWireMessage(MqttPersistable data) throws MqttException {
byte[] payload = data.getPayloadBytes();
if (payload == null) {
//防止 null
payload = new byte[0];
MultiByteArrayInputStream mbais = new MultiByteArrayInputStream(data.getHeaderBytes(), data.getHeaderOffset(), data.getHeaderLength(), payload, data.getPayloadOffset(), data.getPayloadLength());
return createWireMessage((InputStream)mbais);
public static org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage createWireMessage(byte[] bytes) throws MqttException {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
return createWireMessage((InputStream)bais);
private static org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage createWireMessage(InputStream inputStream) throws MqttException {
try {
CountingInputStream counter = new CountingInputStream(inputStream);
DataInputStream in = new DataInputStream(counter);
//the next byte of this input stream, interpreted as an unsigned 8-bit number.
int first = in.readUnsignedByte();
byte type = (byte)(first >> 4);
byte info = (byte)(first &= 15);
long remLen = (long)readMBI(in).getValue();
long totalToRead = (long)counter.getCounter() + remLen;
long remainder = totalToRead - (long)counter.getCounter();
byte[] data = new byte[0];
if (remainder > 0L) {
data = new byte[(int)remainder];
in.readFully(data, 0, data.length);
//按照消息类型 组合消息头和消息体 生成的包不一样
Object result;
if (type == 1) {
result = new MqttConnect(info, data);
} else if (type == 3) {
result = new MqttPublish(info, data);
} else if (type == 4) {
result = new MqttPubAck(info, data);
} else if (type == 7) {
result = new MqttPubComp(info, data);
} else if (type == 2) {
result = new MqttConnack(info, data);
} else if (type == 12) {
result = new MqttPingReq(info, data);
} else if (type == 13) {
result = new MqttPingResp(info, data);
} else if (type == 8) {
result = new MqttSubscribe(info, data);
} else if (type == 9) {
result = new MqttSuback(info, data);
} else if (type == 10) {
result = new MqttUnsubscribe(info, data);
} else if (type == 11) {
result = new MqttUnsubAck(info, data);
} else if (type == 6) {
result = new MqttPubRel(info, data);
} else if (type == 5) {
result = new MqttPubRec(info, data);
} else {
if (type != 14) {
throw ExceptionHelper.createMqttException(6);
// type = 14 的情况
result = new MqttDisconnect(info, data);
return (org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage)result;
} catch (IOException var14) {
throw new MqttException(var14);
// 输入一个long 类型的数,返回一个byte数组
public static byte[] encodeMBI(long number) {
// 检查 number 是不是在规定的范围之内
int numBytes = 0;
long no = number;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
do {
byte digit = (byte)((int)(no % 128L));
no /= 128L;
if (no > 0L) {
digit = (byte)(digit | 128);
} while(no > 0L && numBytes < 4);
return bos.toByteArray();
public static MultiByteInteger readMBI(DataInputStream in) throws IOException {
int msgLength = 0;
int multiplier = 1;
int count = 0;
byte digit;
do {
digit = in.readByte();
msgLength += (digit & 127) * multiplier;
multiplier *= 128;
} while((digit & 128) != 0);
if (msgLength >= 0 && msgLength <= 268435455) {
return new MultiByteInteger(msgLength, count);
} else {
throw new IOException("This property must be a number between 0 and 268435455. Read value was: " + msgLength);
protected byte[] encodeMessageId() throws MqttException {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
return baos.toByteArray();
} catch (IOException var3) {
throw new MqttException(var3);
public boolean isRetryable() {
return false;
public void setDuplicate(boolean duplicate) {
this.duplicate = duplicate;
public static void encodeUTF8(DataOutputStream dos, String stringToEncode) throws MqttException {
try {
byte[] encodedString = stringToEncode.getBytes(STRING_ENCODING);
byte byte1 = (byte)(encodedString.length >>> 8 & 255);
byte byte2 = (byte)(encodedString.length >>> 0 & 255);
} catch (UnsupportedEncodingException var5) {
throw new MqttException(var5);
} catch (IOException var6) {
throw new MqttException(var6);
public static String decodeUTF8(DataInputStream input) throws MqttException {
try {
int encodedLength = input.readUnsignedShort();
byte[] encodedString = new byte[encodedLength];
String output = new String(encodedString, STRING_ENCODING);
return output;
} catch (IOException var4) {
throw new MqttException(var4);
private static void validateUTF8String(String input) throws IllegalArgumentException {
for(int i = 0; i < input.length(); ++i) {
boolean isBad = false;
char c = input.charAt(i);
if (Character.isHighSurrogate(c)) {
if (i == input.length()) {
isBad = true;
} else {
char c2 = input.charAt(i);
if (Character.isLowSurrogate(c2)) {
isBad = true;
} else {
int ch = (c & 1023) << 10 | c2 & 1023;
if ((ch & '\uffff') == 65535 || (ch & '\uffff') == 65534) {
isBad = true;
} else if (!Character.isISOControl(c) && !Character.isLowSurrogate(c)) {
if (c >= '\ufdd0' && (c == '\ufffe' || c >= '\ufdd0' || c <= '\ufddf')) {
isBad = true;
} else {
isBad = true;
if (isBad) {
throw new IllegalArgumentException(String.format("Invalid UTF-8 char: [%x]", Integer.valueOf(c)));
// 检查int value 是不是在规定的范围之内
public static void validateVariableByteInt(int value) throws IllegalArgumentException {
if (value < 0 || value > 268435455) {
throw new IllegalArgumentException("This property must be a number between 0 and 268435455");
public MqttToken getToken() {
return this.token;
public void setToken(MqttToken token) {
this.token = token;
public String toString() {
return PACKET_NAMES[this.type];
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttAck;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttConnack;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttSuback;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
public class Token {
//如此写法有什么好处? final 修饰,常量,类加载后 CLASS_NAME 被保存到常量池中 ,但是该处使用反射得到类名 主动加载
private static final String CLASS_NAME = org.eclipse.paho.client.mqttv3.internal.Token.class.getName();
private Logger log;
private volatile boolean completed;
private boolean pendingComplete;
private boolean sent;
private Object responseLock;
private Object sentLock;
protected MqttMessage message;
private MqttWireMessage response;
private MqttException exception;
private String[] topics;
private String key;
private IMqttAsyncClient client;
private IMqttActionListener callback;
private Object userContext;
private int messageID;
private boolean notified;
public Token(String logContext) {
this.log = LoggerFactory.getLogger("org.eclipse.paho.client.mqttv3.internal.nls.logcat", CLASS_NAME);
this.completed = false;
this.pendingComplete = false;
this.sent = false;
this.responseLock = new Object();
this.sentLock = new Object();
this.message = null;
this.response = null;
this.exception = null;
this.topics = null;
this.client = null;
this.callback = null;
this.userContext = null;
this.messageID = 0;
this.notified = false;
public int getMessageID() {
return this.messageID;
public void setMessageID(int messageID) {
this.messageID = messageID;
public boolean checkResult() throws MqttException {
if (this.getException() != null) {
throw this.getException();
} else {
return true;
public MqttException getException() {
return this.exception;
public boolean isComplete() {
return this.completed;
protected boolean isCompletePending() {
return this.pendingComplete;
protected boolean isInUse() {
return this.getClient() != null && !this.isComplete();
public void setActionCallback(IMqttActionListener listener) {
this.callback = listener;
public IMqttActionListener getActionCallback() {
return this.callback;
public void waitForCompletion() throws MqttException {
public void waitForCompletion(long timeout) throws MqttException {
String methodName = "waitForCompletion";
this.log.fine(CLASS_NAME, "waitForCompletion", "407", new Object[]{this.getKey(), timeout, this});
MqttWireMessage resp = this.waitForResponse(timeout);
if (resp == null && !this.completed) {
this.log.fine(CLASS_NAME, "waitForCompletion", "406", new Object[]{this.getKey(), this});
this.exception = new MqttException(32000);
throw this.exception;
} else {
protected MqttWireMessage waitForResponse() throws MqttException {
return this.waitForResponse(-1L);
protected MqttWireMessage waitForResponse(long timeout) throws MqttException {
String methodName = "waitForResponse";
synchronized(this.responseLock) {
this.log.fine(CLASS_NAME, "waitForResponse", "400", new Object[]{this.getKey(), timeout, this.sent, this.completed, this.exception == null ? "false" : "true", this.response, this}, this.exception);
while(!this.completed) {
if (this.exception == null) {
try {
this.log.fine(CLASS_NAME, "waitForResponse", "408", new Object[]{this.getKey(), new Long(timeout)});
if (timeout <= 0L) {
} else {
} catch (InterruptedException var6) {
this.exception = new MqttException(var6);
if (!this.completed) {
if (this.exception != null) {
this.log.fine(CLASS_NAME, "waitForResponse", "401", (Object[])null, this.exception);
throw this.exception;
if (timeout > 0L) {
this.log.fine(CLASS_NAME, "waitForResponse", "402", new Object[]{this.getKey(), this.response});
return this.response;
protected void markComplete(MqttWireMessage msg, MqttException ex) {
String methodName = "markComplete";
this.log.fine(CLASS_NAME, "markComplete", "404", new Object[]{this.getKey(), msg, ex});
synchronized(this.responseLock) {
if (msg instanceof MqttAck) {
this.message = null;
this.pendingComplete = true;
this.response = msg;
this.exception = ex;
protected void notifyComplete() {
String methodName = "notifyComplete";
this.log.fine(CLASS_NAME, "notifyComplete", "404", new Object[]{this.getKey(), this.response, this.exception});
synchronized(this.responseLock) {
if (this.exception == null && this.pendingComplete) {
this.completed = true;
this.pendingComplete = false;
} else {
this.pendingComplete = false;
synchronized(this.sentLock) {
this.sent = true;
public void waitUntilSent() throws MqttException {
String methodName = "waitUntilSent";
synchronized(this.sentLock) {
synchronized(this.responseLock) {
if (this.exception != null) {
throw this.exception;
while(!this.sent) {
try {
this.log.fine(CLASS_NAME, "waitUntilSent", "409", new Object[]{this.getKey()});
} catch (InterruptedException var4) {
if (!this.sent) {
if (this.exception == null) {
throw ExceptionHelper.createMqttException(6);
} else {
throw this.exception;
protected void notifySent() {
String methodName = "notifySent";
this.log.fine(CLASS_NAME, "notifySent", "403", new Object[]{this.getKey()});
synchronized(this.responseLock) {
this.response = null;
this.completed = false;
synchronized(this.sentLock) {
this.sent = true;
public IMqttAsyncClient getClient() {
return this.client;
protected void setClient(IMqttAsyncClient client) {
this.client = client;
public void reset() throws MqttException {
String methodName = "reset";
if (this.isInUse()) {
throw new MqttException(32201);
} else {
this.log.fine(CLASS_NAME, "reset", "410", new Object[]{this.getKey()});
this.client = null;
this.completed = false;
this.response = null;
this.sent = false;
this.exception = null;
this.userContext = null;
public MqttMessage getMessage() {
return this.message;
public MqttWireMessage getWireMessage() {
return this.response;
public void setMessage(MqttMessage msg) {
this.message = msg;
public String[] getTopics() {
return this.topics;
public void setTopics(String[] topics) {
this.topics = (String[])topics.clone();
public Object getUserContext() {
return this.userContext;
public void setUserContext(Object userContext) {
this.userContext = userContext;
public void setKey(String key) {
this.key = key;
public String getKey() {
return this.key;
public void setException(MqttException exception) {
synchronized(this.responseLock) {
this.exception = exception;
public boolean isNotified() {
return this.notified;
public void setNotified(boolean notified) {
this.notified = notified;
public String toString() {
StringBuffer tok = new StringBuffer();
tok.append(" ,topics=");
if (this.getTopics() != null) {
for(int i = 0; i < this.getTopics().length; ++i) {
tok.append(this.getTopics()[i]).append(", ");
tok.append(" ,usercontext=").append(this.getUserContext());
tok.append(" ,isComplete=").append(this.isComplete());
tok.append(" ,isNotified=").append(this.isNotified());
tok.append(" ,exception=").append(this.getException());
tok.append(" ,actioncallback=").append(this.getActionCallback());
return tok.toString();
public int[] getGrantedQos() {
int[] val = new int[0];
if (this.response instanceof MqttSuback) {
val = ((MqttSuback)this.response).getGrantedQos();
return val;
public boolean getSessionPresent() {
boolean val = false;
if (this.response instanceof MqttConnack) {
val = ((MqttConnack)this.response).getSessionPresent();
return val;
public MqttWireMessage getResponse() {
return this.response;