设备接入 IOTDA-IoT Device SDK使用指南(Java):如何开发网关
如何开发网关
网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。
- 使用AbstractGateway类
继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口:
1 2 3 4 5 6 7
public abstract void onSubdevCommand(String requestId, Command command); public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet); public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet); public abstract void onSubdevMessage(DeviceMessage message);
- iot-gateway-demo代码介绍
工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类:
SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发
StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息
SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存
Session:设备会话类,保存了设备id和TCP的channel的对应关系
- SimpleGateway类
添加或删除子设备处理
添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口
删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@Override public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) { for (DeviceInfo subdevice : subDevicesInfo.getDevices()) { Session session = nodeIdToSesseionMap.get(subdevice.getNodeId()); if (session != null) { if (session.getChannel() != null) { session.getChannel().close(); channelIdToSessionMap.remove(session.getChannel().id().asLongText()); nodeIdToSesseionMap.remove(session.getNodeId()); } } } return super.onDeleteSubDevices(subDevicesInfo); }
- 下行消息处理
网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。
- 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@Override public void onSubdevMessage(DeviceMessage message) { //平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即 //deviceId = productId_nodeId String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId()); if (nodeId == null) { return; } //通过nodeId获取session,进一步获取channel Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { log.error("subdev is not connected " + nodeId); return; } if (session.getChannel() == null){ log.error("channel is null " + nodeId); return; } //直接把消息转发给子设备 session.getChannel().writeAndFlush(message.getContent()); log.info("writeAndFlush " + message); }
- 属性读写:
属性读写包括属性设置和属性查询。
属性设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
@Override public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) { if (propsSet.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet)); //为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应 getClient().respondPropsSet(requestId, IotResult.SUC CES S); log.info("writeAndFlush " + propsSet); }
属性查询:1 2 3 4 5 6 7
@Override public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) { //不建议平台直接读子设备的属性,这里直接返回失败 log.error("not supporte onSubdevPropertiesGet"); deviceClient.respondPropsSet(requestId, IotResult.FAIL); }
- 命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
@Override public void onSubdevCommand(String requestId, Command command) { if (command.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command)); //为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应 getClient().respondCommand(requestId, new CommandRsp(0)); log.info("writeAndFlush " + command); }
- 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。
- 上行消息处理
上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话:
如果子设备信息不存在,这里会创建会话失败,直接拒绝连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel incoming = ctx.channel(); log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s); //如果是首条消息,创建session //如果是首条消息,创建session Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText()); if (session == null) { String nodeId = s; session = simpleGateway.createSession(nodeId, incoming); //创建会话失败,拒绝连接 if (session == null) { log.info("close channel"); ctx.close(); } }
如果会话存在,则进行消息转发:
1 2 3 4 5 6 7
else { //如果需要上报属性则调用reportSubDeviceProperties DeviceMessage deviceMessage = new DeviceMessage(s); deviceMessage.setDeviceId(session.getDeviceId()); simpleGateway.reportSubDeviceMessage(deviceMessage, null); }
到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。
- iot-gateway-demo的使用
- 创建子设备的产品,步骤可参考创建产品。
- 在创建的产品中定义模型,添加服务,服务ID为parameter。并且新增alarm和temperature两个属性,如下图所示
图9 模型定义-子设备产品
- 修改StringTcpServer的main函数,替换构造参数,然后运行该类。
1 2 3
simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(), "ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883", "5e06bfee334dd4f33759f5b3_demo", "mysecret");
- 在平台上看到该网关在线后,添加子设备。
图10 设备-添加子设备
表1 子设备参数 参数名称
参数描述
所属产品
子设备所属的产品,选择步骤1创建的产品。
设备名称
即device_name,可自定义,如subdev_name
设备标识码
即node_id,填写subdev。
设备ID
即devicee_id,可不填写,自动生成。
此时网关上日志打印:
2024-04-16 21:00:01 INFO SubDevicesFilePersistence:112 - add subdev, the nodeId is subdev
- 运行TcpDevice类,建立连接后,输入步骤3中注册的子设备的nodeId,如subdev。
图11 子设备连接
此时网关设备日志打印:
2024-04-16 21:00:54 INFO StringTcpServer:196 - initChannel: /127.0.0.1:21889 2024-04-16 21:01:00 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is subdev 2024-04-16 21:01:00 INFO SimpleGateway:100 - create new session ok, the session is Session{nodeId='subdev', channel=[id: 0xf9b89f78, L:/127.0.0.1:8080 - R:/127.0.0.1:21889], deviceId='subdev_deviceId'}
- 在平台上看到子设备上线。
图12 设备列表-设备在线
- 子设备上报消息
图13 子设备上报消息
查看日志看到上报成功
2024-04-16 21:02:36 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is hello 2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":"subdev_deviceId"] 2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/gateway/sub_devices/properties/report, msg = {"devices":[{"services":[{"properties":{"temprature":2,"alarm":1},"service_id":"parameter","event_time":null}],"device_id":"subdev_deviceId"}]]
- 查看消息跟踪
在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪:
图14 消息跟踪-直连设备