设备接入 IoTDA-IoT Device SDK使用指南(Java):如何开发网关

时间:2025-02-12 15:01:44

如何开发网关

网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。

  • 使用AbstractGateway类

    继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口:

    1234567
        public abstract void onSubdevCommand(String requestId, Command command);    public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet);    public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet);    public abstract void onSubdevMessage(DeviceMessage message);
  • iot-gateway-demo代码介绍

    工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类:

    SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发

    StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息

    SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存

    Session:设备会话类,保存了设备id和TCP的channel的对应关系

  • SimpleGateway类

    添加或删除子设备处理

    添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口

    删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。

     1 2 3 4 5 6 7 8 9101112131415161718
       @Override    public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) {        for (DeviceInfo subdevice : subDevicesInfo.getDevices()) {            Session session = nodeIdToSesseionMap.get(subdevice.getNodeId());            if (session != null) {                if (session.getChannel() != null) {                    session.getChannel().close();                    channelIdToSessionMap.remove(session.getChannel().id().asLongText());                    nodeIdToSesseionMap.remove(session.getNodeId());                }            }        }        return super.onDeleteSubDevices(subDevicesInfo);    }
  • 下行消息处理
    网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。
    • 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。
       1 2 3 4 5 6 7 8 910111213141516171819202122232425
       @Override    public void onSubdevMessage(DeviceMessage message) {        //平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即        //deviceId = productId_nodeId        String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId());        if (nodeId == null) {            return;        }        //通过nodeId获取session,进一步获取channel        Session session = nodeIdToSesseionMap.get(nodeId);        if (session == null) {            log.error("subdev is not connected " + nodeId);            return;        }        if (session.getChannel() == null){            log.error("channel is null " + nodeId);            return;        }        //直接把消息转发给子设备        session.getChannel().writeAndFlush(message.getContent());        log.info("writeAndFlush " + message);    }
    • 属性读写:

      属性读写包括属性设置和属性查询。

      属性设置:

       1 2 3 4 5 6 7 8 91011121314151617181920212223242526
       @Override    public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) {        if (propsSet.getDeviceId() == null) {            return;        }        String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId());        if (nodeId == null) {            return;        }        Session session = nodeIdToSesseionMap.get(nodeId);        if (session == null) {            return;        }        //这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换        session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet));        //为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应        getClient().respondPropsSet(requestId, IotResult.SUC CES S);        log.info("writeAndFlush " + propsSet);    }
      属性查询:
      1234567
       @Override    public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) {        //不建议平台直接读子设备的属性,这里直接返回失败        log.error("not supporte onSubdevPropertiesGet");        deviceClient.respondPropsSet(requestId, IotResult.FAIL);    }

    • 命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。
       1 2 3 4 5 6 7 8 9101112131415161718192021222324
      @Override    public void onSubdevCommand(String requestId, Command command) {        if (command.getDeviceId() == null) {            return;        }        String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId());        if (nodeId == null) {            return;        }        Session session = nodeIdToSesseionMap.get(nodeId);        if (session == null) {            return;        }        //这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换        session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command));        //为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应        getClient().respondCommand(requestId, new CommandRsp(0));        log.info("writeAndFlush " + command);    }

  • 上行消息处理

    上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话:

    如果子设备信息不存在,这里会创建会话失败,直接拒绝连接

     1 2 3 4 5 6 7 8 9101112131415161718
     @Override        protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {            Channel incoming = ctx.channel();            log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s);            //如果是首条消息,创建session//如果是首条消息,创建session            Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText());            if (session == null) {                String nodeId = s;                session = simpleGateway.createSession(nodeId, incoming);                //创建会话失败,拒绝连接                if (session == null) {                    log.info("close channel");                    ctx.close();                }            } 

    如果会话存在,则进行消息转发:

    1234567
    else {                //如果需要上报属性则调用reportSubDeviceProperties                DeviceMessage deviceMessage = new DeviceMessage(s);                deviceMessage.setDeviceId(session.getDeviceId());                simpleGateway.reportSubDeviceMessage(deviceMessage, null);            }

    到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。

  • iot-gateway-demo的使用
    1. 创建子设备的产品,步骤可参考创建产品
    2. 在创建的产品中定义模型,添加服务,服务ID为parameter。并且新增alarm和temperature两个属性,如下图所示
      图9 模型定义-子设备产品
    3. 修改StringTcpServer的main函数,替换构造参数,然后运行该类。
      123
       simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(),                "ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883",                "5e06bfee334dd4f33759f5b3_demo", "mysecret");
    4. 在平台上看到该网关在线后,添加子设备。
      图10 设备-添加子设备
      表1 子设备参数

      参数名称

      参数描述

      所属产品

      子设备所属的产品,选择步骤1创建的产品。

      设备名称

      即device_name,可自定义,如subdev_name

      设备标识码

      即node_id,填写subdev。

      设备ID

      即devicee_id,可不填写,自动生成。

      此时网关上日志打印:

      2024-04-16 21:00:01 INFO SubDevicesFilePersistence:112 - add subdev, the nodeId is subdev

    5. 运行TcpDevice类,建立连接后,输入步骤3中注册的子设备的nodeId,如subdev。
      图11 子设备连接

      此时网关设备日志打印:

      2024-04-16 21:00:54  INFO StringTcpServer:196 - initChannel: /127.0.0.1:218892024-04-16 21:01:00  INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is subdev2024-04-16 21:01:00  INFO SimpleGateway:100 - create new session ok, the session is Session{nodeId='subdev', channel=[id: 0xf9b89f78, L:/127.0.0.1:8080 - R:/127.0.0.1:21889], deviceId='subdev_deviceId'}
    6. 在平台上看到子设备上线。
      图12 设备列表-设备在线
    7. 子设备上报消息
      图13 子设备上报消息

      查看日志看到上报成功

      2024-04-16 21:02:36  INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is hello2024-04-16 21:02:36  INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":"subdev_deviceId"]2024-04-16 21:02:36  INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/gateway/sub_devices/properties/report, msg = {"devices":[{"services":[{"properties":{"temprature":2,"alarm":1},"service_id":"parameter","event_time":null}],"device_id":"subdev_deviceId"}]]
    8. 查看消息跟踪

      在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪:

      图14 消息跟踪-直连设备
support.huaweicloud.com/sdkreference-iothub/iot_02_0089.html