华为云用户手册

  • MQTT broker 终端设备可以通过MQTT协议与IEF云端进行通信,您也可以通过发送/订阅消息控制终端设备。 边缘节点上有一个内置MQTT broker,内置MQTT broker使用8883端口与终端设备通信,与内置MQTT broker通信需要经过安全认证,具体请参见使用证书进行安全认证。 另外,边缘节点还支持与外置MQTT broker通信,即在边缘节点上安装一个MQTT broker(如开源的Mosquitto,默认使用1883端口通信)。 如使用外置MQTT broker,请注意需要保证外置MQTT broker通信的端口能正常使用。
  • 示例 $hw/events/device/ab39361a-6fc0-4c94-b919-72b1e08ca690/twin/update/result { "event_id":"123457", "timestamp":1557317614026, "twin":{ "state":{ "actual":{ "value":"stop", "metadata":{ "timestamp":1557317614026 } }, "optional":true, "metadata":{ "type":"string" } } } }
  • 示例 $hw/events/device/ab39361a-6fc0-4c94-b919-72b1e08ca690/twin/update/result { "event_id":"123457", "timestamp":1557317614026, "twin":{ "state":{ "actual":{ "value":"stop", "metadata":{ "timestamp":1557317614026 } }, "optional":true, "metadata":{ "type":"string" } } } }
  • 示例 $hw/events/device/ab39361a-6fc0-4c94-b919-72b1e08ca690/twin/update { "event_id":"123457", "twin":{ "state":{ "actual":{ "value":"stop" } } } }
  • 示例 $hw/events/device/ab39361a-6fc0-4c94-b919-72b1e08ca690/twin/update { "event_id":"123457", "twin":{ "state":{ "actual":{ "value":"stop" } } } }
  • 示例 $hw/events/device/ab39361a-6fc0-4c94-b919-72b1e08ca690/twin/get/result { "event_id":"123456", "timestamp":1557317510926, "twin":{ "state":{ "expected":{ "value":"stop", "metadata":{ "timestamp":1557316778931 } }, "optional":true, "metadata":{ "type":"string" } } } }
  • 示例 $hw/events/device/ab39361a-6fc0-4c94-b919-72b1e08ca690/twin/get/result { "event_id":"123456", "timestamp":1557317510926, "twin":{ "state":{ "expected":{ "value":"stop", "metadata":{ "timestamp":1557316778931 } }, "optional":true, "metadata":{ "type":"string" } } } }
  • 示例 $hw/events/node/3fbb5b8d-32db-4271-a34f-a013e021b6ce/membership/get/result { "event_id":"bc876bc-345d-4050-86a8-319a5b13cc10", "timestamp":1557317193524, "devices":[ { "id":"ab39361a-6fc0-4c94-b919-72b1e08ca690", "name":"IEF-device", "state":"unknown", "attributes":{ "address":{ "value":"longgang", "optional":true, "metadata":{ "type":"string" } } } } ] }
  • 示例 $hw/events/node/3fbb5b8d-32db-4271-a34f-a013e021b6ce/membership/get/result { "event_id":"bc876bc-345d-4050-86a8-319a5b13cc10", "timestamp":1557317193524, "devices":[ { "id":"ab39361a-6fc0-4c94-b919-72b1e08ca690", "name":"IEF-device", "state":"unknown", "attributes":{ "address":{ "value":"longgang", "optional":true, "metadata":{ "type":"string" } } } } ] }
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/device/{device_id}/updated { "event_id":"", "timestamp":1557314742136, "attributes":{ "address":{ "value":"shenzhen", "optional":true, "metadata":{ "type":"string" } } } }
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/device/{device_id}/updated { "event_id":"", "timestamp":1557314742136, "attributes":{ "address":{ "value":"xxx", "optional":true, "metadata":{ "type":"string" } } } }
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/node/{node_id}/membership/updated { "event_id":"04a975ab-fd51-49be-85f5-5967e994f640", "timestamp":1557314742136, "added_devices":[ { "id":"ab39361a-6fc0-4c94-b919-72b1e08ca690", "name":"IEF-device", "state":"unknown", "attributes":{ "address":{ "value":"shenzhen", "optional":true, "metadata":{ "type":"string" } } }, "twin":{ "state":{ "expected":{ "value":"running", "metadata":{ "timestamp":1557314434570 } }, "optional":true, "metadata":{ "type":"string" } } } } ], "removed_devices":null }
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/node/{node_id}/membership/updated { "event_id":"04a975ab-fd51-49be-85f5-5967e994f640", "timestamp":1557314742136, "added_devices":[ { "id":"ab39361a-6fc0-4c94-b919-72b1e08ca690", "name":"IEF-device", "state":"unknown", "attributes":{ "address":{ "value":"xxx", "optional":true, "metadata":{ "type":"string" } } }, "twin":{ "state":{ "expected":{ "value":"running", "metadata":{ "timestamp":1557314434570 } }, "optional":true, "metadata":{ "type":"string" } } } } ], "removed_devices":null }
  • 参数说明 参数 类型 说明 event_id String 事件ID timestamp Int64 事件发生事件戳 twin Object 设备孪生变更信息集合,每个孪生以key/value形式存在。value中包含是否可选、孪生metadata包含类型信息、孪生期望状态包含期望值和更新时间、孪生真实状态包含真实值和更新时间等。 delta Map 包含设备孪生期望值与真实值不同的孪生名称和期望值。
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/device/{device_id}/twin/update/delta { "event_id":"b9625811-f34f-4252-bee9-98185e7e1ec7", "timestamp":1557314742131, "twin":{ "state":{ "expected":{ "value":"running", "metadata":{ "timestamp":1557314742122 } }, "optional":true, "metadata":{ "type":"string" } } }, "delta":{ "state":"running" } }
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/device/{device_id}/twin/update/delta { "event_id":"b9625811-f34f-4252-bee9-98185e7e1ec7", "timestamp":1557314742131, "twin":{ "state":{ "expected":{ "value":"running", "metadata":{ "timestamp":1557314742122 } }, "optional":true, "metadata":{ "type":"string" } } }, "delta":{ "state":"running" } }
  • 参数说明 参数 类型 说明 event_id String 事件ID timestamp Int64 事件发生事件戳 twin Object 设备孪生变更信息集合,每个孪生以key/value形式存在。value中包含是否可选、孪生metadata包含类型信息、孪生期望状态包含期望值和更新时间、孪生真实状态包含真实值和更新时间等。 delta Map 包含设备孪生期望值与真实值不同的孪生名称和期望值。
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/device/{device_id}/twin/update/document { "event_id":"", "timestamp":1557314742122, "twin":{ "state":{ "last":null, "current":{ "expected":{ "value":"running", "metadata":{ "timestamp":1557314742122 } }, "optional":true, "metadata":{ "type":"string" } } } } }
  • 示例 终端设备绑定到边缘节点时可收到如下消息。 $hw/events/device/{device_id}/twin/update/document { "event_id":"", "timestamp":1557314742122, "twin":{ "state":{ "last":null, "current":{ "expected":{ "value":"running", "metadata":{ "timestamp":1557314742122 } }, "optional":true, "metadata":{ "type":"string" } } } } }
  • 云端修改孪生属性控制终端设备状态 图3 修改终端设备状态 在IEF中修改终端设备的孪生属性,IEF将终端设备期望状态(Expected State)发送给边缘节点的EdgeHub。 EdgeHub发送终端设备期望状态消息到DeviceTwin,DeviceTwin在边缘节点存储终端设备期望状态。 终端设备实时发消息给MQTT broker查询终端设备期望状态。 EventBus接收到从MQTT broker发过来的消息。 EventBus根据消息去查询终端设备期望状态。 DeviceTwin反馈当前终端设备期望状态给EventBus。 EventBus发送设终端备期望状态的结果给MQTT broker。 终端设备从MQTT broker收到订阅消息,根据期望状态调整实际状态。
  • 终端设备上报实际状态到云端 终端设备上报实际状态到云端的过程如图2所示。 图2 设备上报状态 终端设备将实际状态(Actual State)实时上报给MQTT broker。 EventBus从MQTT broker收到订阅消息,消息内容包含终端设备的实际状态。 EventBus把终端设备实际状态发送给DeviceTwin,DeviceTwin在边缘节点存储终端设备实际状态。 DeviceTwin同步实际状态给WebSocket客户端EdgeHub。 EdgeHub发送消息给IEF。
  • 云端修改孪生属性控制终端设备状态 图3 修改终端设备状态 在IEF中修改终端设备的孪生属性,IEF将终端设备期望状态(Expected State)发送给边缘节点的EdgeHub。 EdgeHub发送终端设备期望状态消息到DeviceTwin,DeviceTwin在边缘节点存储终端设备期望状态。 终端设备实时发消息给MQTT broker查询终端设备期望状态。 EventBus接收到从MQTT broker发过来的消息。 EventBus根据消息去查询终端设备期望状态。 DeviceTwin反馈当前终端设备期望状态给EventBus。 EventBus发送设终端备期望状态的结果给MQTT broker。 终端设备从MQTT broker收到订阅消息,根据期望状态调整实际状态。
  • 终端设备上报实际状态到云端 终端设备上报实际状态到云端的过程如图2所示。 图2 设备上报状态 终端设备将实际状态(Actual State)实时上报给MQTT broker。 EventBus从MQTT broker收到订阅消息,消息内容包含终端设备的实际状态。 EventBus把终端设备实际状态发送给DeviceTwin,DeviceTwin在边缘节点存储终端设备实际状态。 DeviceTwin同步实际状态给WebSocket客户端EdgeHub。 EdgeHub发送消息给IEF。
  • Go语言代码样例 package main import ( "crypto/tls" "crypto/x509" "fmt" "math/rand" "sync" "time" MQTT "github.com/eclipse/paho.mqtt.golang" ) func main() { subClient := InitMqttClient(onSubConnectionLost) pubClient := InitMqttClient(onPubConnectionLost) wait := sync.WaitGroup{} wait.Add(1) go func() { for { time.Sleep(1*time.Second) pubClient.Publish("topic", 0, false, "hello world") } }() subClient.Subscribe("topic", 0, onReceived) wait.Wait() } func InitMqttClient(onConnectionLost MQTT.ConnectionLostHandler) MQTT.Client { pool := x509.NewCertPool() cert, err := tls.LoadX509KeyPair("/tmp/example_cert.crt", "/tmp/example_cert.key") if err != nil { panic(err) } tlsConfig := &tls.Config{ RootCAs: pool, Certificates: []tls.Certificate{cert}, // 单向认证,client不校验服务端证书 InsecureSkipVerify: true, } // 使用tls或者ssl协议,连接8883端口 opts := MQTT.NewClientOptions().AddBroker("tls://127.0.0.1:8883").SetClientID(fmt.Sprintf("%f",rand.Float64())) opts.SetTLSConfig(tlsConfig) opts.OnConnect = onConnect opts.AutoReconnect = false // 回调函数,客户端与服务端断连后立刻被触发 opts.OnConnectionLost = onConnectionLost client := MQTT.NewClient(opts) loopConnect(client) return client } func onReceived(client MQTT.Client, message MQTT.Message) { fmt.Printf("Receive topic: %s, payload: %s \n", message.Topic(), string(message.Payload())) } // sub客户端与服务端断连后,触发重连机制 func onSubConnectionLost(client MQTT.Client, err error) { fmt.Println("on sub connect lost, try to reconnect") loopConnect(client) client.Subscribe("topic", 0, onReceived) } // pub客户端与服务端断连后,触发重连机制 func onPubConnectionLost(client MQTT.Client, err error) { fmt.Println("on pub connect lost, try to reconnect") loopConnect(client) } func onConnect(client MQTT.Client) { fmt.Println("on connect") } func loopConnect(client MQTT.Client) { for { token := client.Connect() if rs, err := CheckClientToken(token); !rs { fmt.Printf("connect error: %s\n", err.Error()) } else { break } time.Sleep(1 * time.Second) } } func CheckClientToken(token MQTT.Token) (bool, error) { if token.Wait() && token.Error() != nil { return false, token.Error() } return true, nil }
  • 操作场景 内置MQTT broker默认开启端口进行TLS(Transport Layer Security)安全认证,客户端必须带上证书才能访问MQTT broker。 终端设备和应用可以通过在对应节点详情页创建的证书进行安全认证。 在节点组中部署的应用可能调度到节点组内任意节点,终端设备可能访问节点组内任意节点,从边缘节点处创建的证书不能满足应用访问节点MQTT broker的需要,所以需要使用节点组证书。节点组证书创建方法请参见节点组证书。
  • 约束与限制 证书与边缘节点绑定,在一个边缘节点下申请的证书只能用来访问该边缘节点的MQTT broker,如果访问其他边缘节点的MQTT broker,会导致认证失败。 一个边缘节点最多只能申请10份证书。 证书的有效期为5年。 MQTT使用限制 表1 MQTT使用限制 描述 限制 支持的MQTT协议版本 3.1.1 与标准MQTT协议的区别 支持QoS 0 支持Topic自定义 不支持QoS 1和QoS 2 不支持will、retain msg MQ TTS 支持的安全等级 采用TCP通道基础 + TLS协议(TLSV1.2 版本)
  • 使用证书 证书用于终端设备与MQTT broker通信时鉴权。 下面是Go语言代码样例和Java语言代码样例,演示了如何使用证书做鉴权。 客户端不需要校验服务端证书,单向认证即可。 内置MQTT broker默认开启8883端口。 样例中的Go语言MQTT Client引用了github.com/eclipse/paho.mqtt.golang开源库。 客户端需要处理断连事件,实现掉线重连机制,提高连接可靠性。
  • 使用证书 证书用于终端设备与MQTT broker通信时鉴权。 下面是Go语言代码样例和Java语言代码样例,演示了如何使用证书做鉴权。 客户端不需要校验服务端证书,单向认证即可。 内置MQTT broker默认开启8883端口。 样例中的Go语言MQTT Client引用了github.com/eclipse/paho.mqtt.golang开源库。 客户端需要处理断连事件,实现掉线重连机制,提高连接可靠性。
  • Go语言代码样例 package main import ( "crypto/tls" "crypto/x509" "fmt" "math/rand" "sync" "time" MQTT "github.com/eclipse/paho.mqtt.golang" ) func main() { subClient := InitMqttClient(onSubConnectionLost) pubClient := InitMqttClient(onPubConnectionLost) wait := sync.WaitGroup{} wait.Add(1) go func() { for { time.Sleep(1*time.Second) pubClient.Publish("topic", 0, false, "hello world") } }() subClient.Subscribe("topic", 0, onReceived) wait.Wait() } func InitMqttClient(onConnectionLost MQTT.ConnectionLostHandler) MQTT.Client { pool := x509.NewCertPool() cert, err := tls.LoadX509KeyPair("/tmp/example_cert.crt", "/tmp/example_cert.key") if err != nil { panic(err) } tlsConfig := &tls.Config{ RootCAs: pool, Certificates: []tls.Certificate{cert}, // 单向认证,client不校验服务端证书 InsecureSkipVerify: true, } // 使用tls或者ssl协议,连接8883端口 opts := MQTT.NewClientOptions().AddBroker("tls://127.0.0.1:8883").SetClientID(fmt.Sprintf("%f",rand.Float64())) opts.SetTLSConfig(tlsConfig) opts.OnConnect = onConnect opts.AutoReconnect = false // 回调函数,客户端与服务端断连后立刻被触发 opts.OnConnectionLost = onConnectionLost client := MQTT.NewClient(opts) loopConnect(client) return client } func onReceived(client MQTT.Client, message MQTT.Message) { fmt.Printf("Receive topic: %s, payload: %s \n", message.Topic(), string(message.Payload())) } // sub客户端与服务端断连后,触发重连机制 func onSubConnectionLost(client MQTT.Client, err error) { fmt.Println("on sub connect lost, try to reconnect") loopConnect(client) client.Subscribe("topic", 0, onReceived) } // pub客户端与服务端断连后,触发重连机制 func onPubConnectionLost(client MQTT.Client, err error) { fmt.Println("on pub connect lost, try to reconnect") loopConnect(client) } func onConnect(client MQTT.Client) { fmt.Println("on connect") } func loopConnect(client MQTT.Client) { for { token := client.Connect() if rs, err := CheckClientToken(token); !rs { fmt.Printf("connect error: %s\n", err.Error()) } else { break } time.Sleep(1 * time.Second) } } func CheckClientToken(token MQTT.Token) (bool, error) { if token.Wait() && token.Error() != nil { return false, token.Error() } return true, nil }
  • 约束与限制 证书与边缘节点绑定,在一个边缘节点下申请的证书只能用来访问该边缘节点的MQTT broker,如果访问其他边缘节点的MQTT broker,会导致认证失败。 一个边缘节点最多只能申请10份证书。 证书的有效期为5年。 MQTT使用限制 表1 MQTT使用限制 描述 限制 支持的MQTT协议版本 3.1.1 与标准MQTT协议的区别 支持QoS 0 支持Topic自定义 不支持QoS 1和QoS 2 不支持will、retain msg MQTTS支持的安全等级 采用TCP通道基础 + TLS协议(TLSV1.2 版本)
共100000条