【阿里云生活物联网架构师专题 ⑧】基于在 SpringBoot Java私有云上接收阿里云物联网设备的各种状态数据,实现m2m流转;
本系列博客学习由非官方人员 半颗心脏 潜心所力所写,仅仅做个人技术交流分享,不做任何商业用途。如有不对之处,请留言,本人及时更改。1、esp32接入阿里云物联网平台,实现天猫精灵语音控制;2、esp8266直连接入阿里云物联网平台,实现天猫精灵找队友零配网功能和语音控制;3、esp32 sdk 直连接入天猫精灵IOT开放平台,实现天猫精灵找队友零配网功能和语音控制;4、如何在天猫精灵IOT开放平台
- 本系列博客学习由非官方人员 半颗心脏 潜心所力所写,仅仅做个人技术交流分享,不做任何商业用途。如有不对之处,请留言,本人及时更改。
1、esp32接入阿里云物联网平台,实现天猫精灵语音控制;
2、esp8266直连接入阿里云物联网平台,实现天猫精灵找队友零配网功能和语音控制;
3、esp32 sdk 直连接入天猫精灵IOT开放平台,实现天猫精灵找队友零配网功能和语音控制;
4、如何在天猫精灵IOT开放平台二次开发智能设备的 H5控制面板;
5、分享可商用的ESP8266 SDK连接阿里云物联网生活平台的在线远程升级OTA笔记。
6、ESP8266接入阿里生活飞燕平台国际版,实现亚马逊Alexa Echo音响语音控制。
7、阿里云物联网平台的网关-子设备理论协议说明,支持Zigbee/ble等没上云能力的设备;;
8、基于在 SpringBoot Java 私有云上接收阿里云物联网平台设备的各种状态数据,实现m2m流转;
文章目录
寄言
我写过很多物联网控制的博文系列,包括微信公众号、微信小程序控制硬件,私有云对接天猫精灵服务器,抑或是硬件端 esp8266/esp32
等系列博文,这是一个一个专题是写下我们如何在阿里云物联网上全栈开发我们的应用专题,让我们无须企业账号也可以体验设备-云端-App” 的过程;让我们变得更强,一个人担任一个公司的全部职责,全栈开发物联网攻城狮前进;
我会带领大家轻轻松松地把自己的设备接入天猫精灵,告别 “单机时代”,走进语音控制物联网时代。 有疑问请留言区留言,或者加群大伙们讨论;写总结,写博文不容易,望大家多多体谅!
- 自带资料:
- git 分布式管理软件的基本使用;
- 硬件开发:乐鑫 esp8266、esp32模块一个;具备 c 语言基础 ,不需要很熟练;
- 移动端开发:android 端具备
java
、vue.js
开发语言,AndroidStudio
环境; - 服务器端开发:php 开发,熟悉 服务器运行、部署等原理操作;
一、前言
将近2个月没更新博文了,罪恶感满满的!工作上频繁遇到了很多客户的一些需求,这2个月我会慢慢做博文,做视频,给大家做一些技术原理讲解,共勉物联网;
针对不少纯软件的电商类型公司想玩转物联网开发,问到我一个问题,我们的私有云服务器如何接收来自阿里云物联网的设备的状态和数据呢?
今天,我给大家啊,介绍下如何在 Java 语言开发的私有云上实现!
什么是服务器订阅?
私有云可以直接订阅产品下多种类型的消息:设备上报消息、设备状态变化通知、设备生命周期变更、网关发现子设备上报、设备拓扑关系变更等。配置服务端订阅后,物联网平台会将产品下所有设备的已订阅类型的消息转发至您的服务器。
适用场景
-
1、服务端订阅适用于单纯的接收设备数据的场景,并且适用于高并发场景。
-
2、服务端接收产品下全部设备的订阅数据。
-
3、如果您有多个服务器消费同一个产品的订阅消息,消息会随机转发至某个服务器。
-
4、服务端订阅与规则引擎数据流转的使用场景和能力对比,请参见数据流转方案对比。
今天使用的是 SpringBoot 架构语言,这个也是我擅长开发的架构,所以这篇我给大家介绍下如何集成AMQP高级消息队列协议,配置AMQP服务端订阅后,物联网平台会将产品下所有已订阅类型的消息,通过AMQP通道推送至私有云。
AMQP服务端订阅消息流转流程图:
二、分享一个好玩的IDEA插件
这几个月在玩 Java 服务器,同事推荐了一个插件 Alibaba Cloud Toolkit,这是阿里巴巴针对 ECS服务器做的一款插件,能一键部署 jar 包,非常方便;
预想在本地看到远程服务器的运行日志,插件还提供了这样的命令,其中运行此命令还需要 SSH 远程连接的账户密码:
其中,restart-springboot.sh
的内容如下:
killall java
nohup java -jar /www/wwwroot/www.aliyun.com/demo-0.0.1-SNAPSHOT.jar > nohup.log 2>&1 &
三、配置AMQP服务端订阅
在物联网平台控制台设置服务端订阅的消息类型。
- 登录物联网平台控制台。
- 在实例概览页,找到对应的实例,单击实例进入实例详情页;
- 在左侧导航栏,选择规则引擎 > 服务端订阅。
- 在服务端订阅页,单击创建订阅。
- 在创建订阅对话框中,完成配置,单击确认。
四、集成开始
首先,先集成架包;
<!--aliyun core-->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.6</version>
</dependency>
<!--aliyun Iot-->
<!-- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-iot -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-iot</artifactId>
<version>7.16.0</version>
</dependency>
<!-- IOT用于监听阿里平台消息 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>iot-client-message</artifactId>
<version>1.1.5</version>
</dependency>
<!-- amqp 1.0 qpid client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.47.0</version>
</dependency>
<!-- util for base64-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
整个接收处理类如下:
其中,要注意的是以下几个参数,其实在阿里云官网描述得非常清楚了点我:
参数 | 说明 |
---|---|
accessKey | 阿里云账号的AccessKey Key |
accessSecret | 阿里云账号的AccessKey Secret |
consumerGroupId | 消费组ID。登录物联网平台控制台,在对应实例的规则引擎 > 服务端订阅 > 消费组列表查看您的消费组ID。 |
iotInstanceId | 实例ID。仅您购买的实例需要传入;如果是公共示例,留空即可。 |
connectionUrl | 见下面的详细描述 |
AMQP客户端接入物联网平台的连接地址和连接认证参数说明如下:
- 接入域名:
- 对于您购买的实例,接入域名请在物联网平台控制台,找到对应的实例,单击实例进入实例详情查看。
- 公共实例的接入域名:
${uid}.iot-amqp.${regionId}.aliyuncs.com
,其域名中的变量说明
字段 | 说明 |
---|---|
uid | 您的阿里云账号ID。可登录物联网平台控制台,单击账号头像,跳转至安全设置页面查看 |
regionId | 您的物联网平台服务所在地域ID。在物联网平台控制台左上方可查看地域。RegionId的表达方法,请参见地域和可用区。 |
package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SpringBootApplication
public class DemoApplication {
private final static Logger logger = LoggerFactory.getLogger(DemoApplication.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
//参数说明,请参见AMQP客户端接入说明文档。
String accessKey = "LTAI4Fg3**********GYJBLPoTo5";
String accessSecret = "OT7s9vk*********K4p8KQ31qoTIL4";
String consumerGroupId = "DEFAULT_GROUP";
//iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
String iotInstanceId = "";
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
String clientId = "xuhongv";
//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = null;
try {
password = doSign(signContent, accessSecret, signMethod);
} catch (Exception e) {
e.printStackTrace();
}
//接入域名,请参见AMQP客户端接入说明文档。
String connectionUrl = "failover:(amqps://13933000001932702.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=15";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = null;
try {
context = new InitialContext(hashtable);
} catch (NamingException e) {
e.printStackTrace();
}
ConnectionFactory cf = null;
Destination queue = null;
try {
cf = (ConnectionFactory) context.lookup("SBCF");
queue = (Destination) context.lookup("QUEUE");
} catch (NamingException e) {
e.printStackTrace();
}
// Create Connection
Connection connection = null;
try {
connection = cf.createConnection(userName, password);
} catch (JMSException e) {
e.printStackTrace();
}
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
} catch (JMSException e) {
e.printStackTrace();
}
}
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(() -> processMessage(message));
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};
/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
logger.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
}
@Override
public void onSessionClosed(Session session, Throwable cause) {
}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {
}
};
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
}
- 当设备正常推送数据上报,服务器会接收到:
2020-11-20 15:43:04.266 INFO 2108 --- [pool-1-thread-1] com.example.demo.DemoApplication : receive message, topic = /a1quyJFAxPS/Device01/thing/event/property/post, messageId = 1329691716009393153, content =
{
"deviceType": "WaterloggingSensor",
"iotId": "PZzjOxtNUvQ2jlpATdPs000000",
"requestId": "123",
"checkFailedData": {},
"productKey": "a1quoiyJF0",
"gmtCreate": 1605858184226,
"deviceName": "Device01",
"items": {
"BatteryLevel": {
"value": 1,
"time": 1605858184231
},
"WaterLeachState": {
"value": 1,
"time": 1605858184231
},
"GeoLocation": {
"value": {
"CoordinateSystem": 1,
"Latitude": 11,
"Longitude": 11,
"Altitude": 1
},
"time": 1605858184231
}
}
}
另外,不要把我的博客作为学习标准,我的只是笔记,难有疏忽之处,如果有,请指出来,也欢迎留言哈!
- 玩转
esp8266
带你飞、加群付费QQ
群,不喜的朋友勿喷勿加:434878850 - esp8266源代码学习汇总(持续更新,欢迎star):https://github.com/xuhongv/StudyInEsp8266
- esp32源代码学习汇总(持续更新,欢迎star):https://github.com/xuhongv/StudyInEsp32
- 关注下面微信公众号二维码,干货多多,第一时间推送!

更多推荐
所有评论(0)