设备接入 IOTDA-IoT Device SDK使用指南(Java):如何开发网关

时间:2024-07-30 11:30:47

如何开发网关

网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。

  • 使用AbstractGateway类

    继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口:

    1
    2
    3
    4
    5
    6
    7
        public abstract void onSubdevCommand(String requestId, Command command);
    
        public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet);
    
        public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet);
    
        public abstract void onSubdevMessage(DeviceMessage message);
    
  • iot-gateway-demo代码介绍

    工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类:

    SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发

    StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息

    SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存

    Session:设备会话类,保存了设备id和TCP的channel的对应关系

  • SimpleGateway类

    添加或删除子设备处理

    添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口

    删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     
     
     @Override
        public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) {
    
            for (DeviceInfo subdevice : subDevicesInfo.getDevices()) {
                Session session = nodeIdToSesseionMap.get(subdevice.getNodeId());
                if (session != null) {
                    if (session.getChannel() != null) {
                        session.getChannel().close();
                        channelIdToSessionMap.remove(session.getChannel().id().asLongText());
                        nodeIdToSesseionMap.remove(session.getNodeId());
                    }
                }
            }
            return super.onDeleteSubDevices(subDevicesInfo);
    
        }
    
  • 下行消息处理
    网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。
    • 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
       @Override
          public void onSubdevMessage(DeviceMessage message) {
      
              //平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即
              //deviceId = productId_nodeId
              String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId());
              if (nodeId == null) {
                  return;
              }
      
              //通过nodeId获取session,进一步获取channel
              Session session = nodeIdToSesseionMap.get(nodeId);
              if (session == null) {
                  log.error("subdev is not connected " + nodeId);
                  return;
              }
              if (session.getChannel() == null){
                  log.error("channel is null " + nodeId);
                  return;
              }
      
              //直接把消息转发给子设备
              session.getChannel().writeAndFlush(message.getContent());
              log.info("writeAndFlush " + message);
          }
      
    • 属性读写:

      属性读写包括属性设置和属性查询。

      属性设置:

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
       @Override
          public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) {
      
              if (propsSet.getDeviceId() == null) {
                  return;
              }
      
              String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId());
              if (nodeId == null) {
                  return;
              }
      
              Session session = nodeIdToSesseionMap.get(nodeId);
              if (session == null) {
                  return;
              }
      
              //这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换
              session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet));
      
              //为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应
              getClient().respondPropsSet(requestId, IotResult.SUC CES S);
      
              log.info("writeAndFlush " + propsSet);
      
          }
      
      属性查询:
      1
      2
      3
      4
      5
      6
      7
       @Override
          public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) {
      
              //不建议平台直接读子设备的属性,这里直接返回失败
              log.error("not supporte onSubdevPropertiesGet");
              deviceClient.respondPropsSet(requestId, IotResult.FAIL);
          }
      

    • 命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      @Override
          public void onSubdevCommand(String requestId, Command command) {
      
              if (command.getDeviceId() == null) {
                  return;
              }
      
              String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId());
              if (nodeId == null) {
                  return;
              }
      
              Session session = nodeIdToSesseionMap.get(nodeId);
              if (session == null) {
                  return;
              }
      
              //这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换
              session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command));
      
              //为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应
              getClient().respondCommand(requestId, new CommandRsp(0));
              log.info("writeAndFlush " + command);
          }
      

  • 上行消息处理

    上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话:

    如果子设备信息不存在,这里会创建会话失败,直接拒绝连接

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     @Override
            protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
                Channel incoming = ctx.channel();
                log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s);
    
                //如果是首条消息,创建session
    //如果是首条消息,创建session
                Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText());
                if (session == null) {
                    String nodeId = s;
                    session = simpleGateway.createSession(nodeId, incoming);
    
                    //创建会话失败,拒绝连接
                    if (session == null) {
                        log.info("close channel");
                        ctx.close();
                    }
                } 
    

    如果会话存在,则进行消息转发:

    1
    2
    3
    4
    5
    6
    7
    else {
                    //如果需要上报属性则调用reportSubDeviceProperties
                    DeviceMessage deviceMessage = new DeviceMessage(s);
                    deviceMessage.setDeviceId(session.getDeviceId());
                    simpleGateway.reportSubDeviceMessage(deviceMessage, null);
    
                }	
    

    到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。

  • iot-gateway-demo的使用
    1. 创建子设备的产品,步骤可参考创建产品
    2. 在创建的产品中定义模型,添加服务,服务ID为parameter。并且新增alarm和temperature两个属性,如下图所示
      图9 模型定义-子设备产品
    3. 修改StringTcpServer的main函数,替换构造参数,然后运行该类。
      1
      2
      3
       simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(),
                      "ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883",
                      "5e06bfee334dd4f33759f5b3_demo", "mysecret");
      
    4. 在平台上看到该网关在线后,添加子设备。
      图10 设备-添加子设备
      表1 子设备参数

      参数名称

      参数描述

      所属产品

      子设备所属的产品,选择步骤1创建的产品。

      设备名称

      即device_name,可自定义,如subdev_name

      设备标识码

      即node_id,填写subdev。

      设备ID

      即devicee_id,可不填写,自动生成。

      此时网关上日志打印:

      2024-04-16 21:00:01 INFO SubDevicesFilePersistence:112 - add subdev, the nodeId is subdev

    5. 运行TcpDevice类,建立连接后,输入步骤3中注册的子设备的nodeId,如subdev。
      图11 子设备连接

      此时网关设备日志打印:

      2024-04-16 21:00:54  INFO StringTcpServer:196 - initChannel: /127.0.0.1:21889
      2024-04-16 21:01:00  INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is subdev
      2024-04-16 21:01:00  INFO SimpleGateway:100 - create new session ok, the session is Session{nodeId='subdev', channel=[id: 0xf9b89f78, L:/127.0.0.1:8080 - R:/127.0.0.1:21889], deviceId='subdev_deviceId'}
    6. 在平台上看到子设备上线。
      图12 设备列表-设备在线
    7. 子设备上报消息
      图13 子设备上报消息

      查看日志看到上报成功

      2024-04-16 21:02:36  INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is hello
      2024-04-16 21:02:36  INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":"subdev_deviceId"]
      2024-04-16 21:02:36  INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/gateway/sub_devices/properties/report, msg = {"devices":[{"services":[{"properties":{"temprature":2,"alarm":1},"service_id":"parameter","event_time":null}],"device_id":"subdev_deviceId"}]]
    8. 查看消息跟踪

      在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪:

      图14 消息跟踪-直连设备
support.huaweicloud.com/devg-iothub/iot_02_0089.html