华为云用户手册

  • 终端管理-查询终端属性 接口说明 请求对应的消息ID:0x8107 响应对应的消息ID:0x0107 请求示例 { "service_id": "TerminalManagement", "command_name": "QueryTerminalProperty", "paras": {} } 响应示例 { "command_id": "1641970411683486238", "response": { "paras": { "clientType": { "passenger": true, "dangerousGoods": true, "track": true, "taxis": true, "video": true, "splitter": false, "trailer": false }, "manufacturerId": "2D_AN", "deviceType": "BSJ-GF-06", "nodeId": "5kw3noL", "iccId": "12345678901234567890", "hardwareVersion": "3.0.0", "firmwareVersion": "1.1.25", "gssn": { "galileo": true, "gps": true, "bds": true, "glonass": true }, "communication": { "other": false, "gprs": true, "cdma": true, "tdscdma": true, "wcdma": true, "cdma2000": true, "tdlte": true } }, "result_code": 0 } } 响应参数 paras字段定义 表11 参数描述 字段名 类型 参数描述 clientType JsonObject 终端类型 passenger:false-不适用客运车辆,true-适用客运车辆 dangerousGoods:false-不适用危险品车辆,true-适用危险品车辆 track:false-不适用普通货运车辆,true-适用普通货运车辆 taxis:false-不适用出租车辆,true-适用出租车辆 video:false-不支持硬盘录像,true-支持硬盘录像 splitter:false-一体机,true-分体机 trailer:false-不适用挂车,true-适用挂车 manufacturerId string 制造商ID deviceType string 终端型号 nodeId string 终端ID iccId string 终端SIM卡ICCID hardwareVersion string 终端硬件版本号 firmwareVersion string 终端固件版本号 gssn jsonObject GNSS模块属性 galileo:false-不支持Galileo定位,true-支持Galileo定位 gps:false-不支持GPS定位,true-支持GPS定位 bds:false-不支持北斗定位,true-支持北斗定位 glonass:false-不支持GLONASS定位,true-支持GLONASS定位 communication jsonObject 通信模块属性 other:false-不支持其他通信方式,true-支持其他通信方式 gprs:false-不支持GPRS通信方式,true-支持GPRS通信方式 cdma:false-不支持 CDM A通信方式,true-支持CDMA通信方式 tdscdma:false-不支持TD-SCDMA通信方式,true-支持TD-SCDMA通信方式 wcdma:false-不支持WCDMA通信方式,true-支持WCDMA通信方式 cdma2000:false-不支持CDMA2000通信方式,true-支持CDMA2000通信方式 tdlte:false-不支持TD-LTE通信方式,true-支持TD-LTE通信方式
  • 终端管理-终端控制 接口说明 请求对应的消息ID:0x8105 响应对应的消息ID:0x0001(终端通用响应) 请求示例 { "service_id": "TerminalManagement", "command_name": "TerminalControl", "paras": { "commandId": 4, "parameter": null } } 请求参数 paras字段定义 表9 参数描述 字段名 必选/可选 类型 参数描述 commandId 必选 int 命令字(十进制表示) parameter 可选 string 命令参数 响应示例 { "command_id": "1641970324270870620", "response": { "paras": { "flowNo": 0, "msgId": 33029, "resultCode": 0 }, "result_code": 0 } } 响应参数(终端通用响应) paras字段定义 表10 参数描述 字段名 类型 参数描述 flowNo int 应答流水号 msgId int 对应的平台的消息ID resultCode int 结果
  • 位置及报警-位置信息查询 接口说明 请求对应的消息ID:0x8201 响应对应的消息ID:0x0201 请求示例 { "service_id": "Location", "command_name": "QueryLocation", "paras": {} } 响应示例 { "command_id": "1641971430850245415", "response": { "paras": { "flowNo": 0, "locationReport": { "alarm": { "emergencyAlarm": 0, "overSpeedAlarm": 1, "fatigueDrivingAlarm": 0, "dangerousDrivingAlarm": 1, "powerVoltageAlarm": 0, "powerOffAlarm": 0, "cameraErrorAlarm": 1, "overSpeedWarning": 1, "fatigueDrivingWarning": 0, "violationDrivingAlarm": 0, "tirePressureWarning": 0, "rightTurnErrorAlarm": 0, "overDrivingTimeAlarm": 0, "overParkAlarm": 0, "drivingAreaAlarm": 0, "drivingRouteAlarm": 0, "drivingTimeAbnormalAlarm": 0, "offCourseAlarm": 0, "gasolineAlarm": 0, "stolenAlarm": 0, "startingAbnormalAlarm": 0, "displacementAbnormalAlarm": 0, "rolloverAlarm": 0, "rolloverWarning": 0, "gnsserrorAlarm": 1, "gnssantennaNotConnectedAlarm": 0, "lcderrorAlarm": 1, "ttserrorAlarm": 0, "icerrorAlarm": 0, "vssalarm": 0, "gnssantennaShortCircuitAlarm": 1 }, "status": { "accStatus": 0, "positioningStatus": 0, "latitudeStatus": 0, "longitudeStatus": 1, "businessStatus": 0, "encryptedStatus": 1, "forwardCollisionWarningStatus": 0, "laneDeviationAlarmStatus": 0, "loadStatus": 0, "oilStatus": 1, "powerStatus": 0, "doorStatus": 1, "firstDoorStatus": 1, "secondDoorStatus": 1, "thirdDoorStatus": 0, "fourthDoorStatus": 0, "fifthDoorStatus": 0, "galileoStatus": 0, "vehicleStatus": 0, "gpsstatus": 0, "bdsstatus": 0, "glonassstatus": 0 }, "longitude": 0.041957, "latitude": 0.056143, "height": 48243, "speed": 10001, "direction": 300, "dateTime": "220112151406", "extend": [] } }, "result_code": 0 } } 响应参数 表12 参数描述 字段名 类型 参数描述 flowNo int 应答流水号 locationReport jsonObject 字段定义参考表2 表13 协议报文字段与属性对应表 起始字节 字段 数据类型 对应属性 0 报警标志 DWORD alarm 4 状态 DWORD status 8 纬度 DWORD longitude 12 经度 DWORD latitude 16 高程 WORD height 18 速度 WORD speed 20 方向 WORD direction 21 时间 BCD[6] dateTime
  • 终端管理-查询指定终端参数 接口说明 请求对应的消息ID:0x8106 响应对应的消息ID:0x0104 请求示例 { "service_id": "TerminalManagement", "command_name": "QueryAssignedTerminalParameters", "paras": { "parameterIds": [1] } } 请求参数 paras字段定义 表7 参数描述 字段名 必选/可选 类型 参数描述 parameterIds 必选 stringList 查询的终端参数的参数ID(十进制表示)列表 响应示例 { "command_id": "1641973633890258246", "response": { "paras": { "flowNo": 0, "total": 1, "parameters": { "1": 120 } }, "result_code": 0 } } 响应参数 paras字段定义 表8 参数描述 字段名 类型 参数描述 flowNo int 应答流水号 total int 应答参数个数 parameters JsonObject 终端参数的参数列表,其中key为参数ID(十进制表示),value为参数值
  • 终端管理-查询终端参数 接口说明 请求对应的消息ID:0x8104 响应对应的消息ID:0x0104 请求示例 { "service_id": "TerminalManagement", "command_name": "QueryTerminalParameters", "paras": {} } 响应示例 { "command_id": "1641973633890258246", "response": { "paras": { "flowNo": 0, "total": 1, "parameters": { "1": 120 } }, "result_code": 0 } } 响应参数 paras字段定义 表6 参数描述 字段名 类型 参数描述 flowNo int 应答流水号 total int 应答参数个数 parameters JsonObject 终端参数的参数列表,其中key为参数ID(十进制表示),value为参数值
  • 终端管理-设置终端参数 接口说明 请求对应的消息ID:0x8103 响应对应的消息ID:0x0001(终端通用响应) 请求示例 { "service_id": "TerminalManagement", "command_name": "SetTerminalParameters", "paras": { "parameters": { "1": 120 } } } 请求参数 paras字段定义 表4 参数描述 字段名 必选/可选 类型 参数描述 parameters 必选 JsonObject 设置的终端参数的参数列表,其中key为参数ID(十进制表示),value为参数值 约束: 平台当前固定心跳时长为180s 不支持多值参数 响应示例 { "command_id": "1641969363120826882", "response": { "paras": { "flowNo": 0, "msgId": 33027, "resultCode": 0 }, "result_code": 0 } } 响应参数(终端通用响应) paras字段定义 表5 参数描述 字段名 类型 参数描述 flowNo int 应答流水号 msgId int 对应的平台的消息ID resultCode int 结果
  • 属性上报 以下消息转换为IoTDA平台的属性,应用可以通过查询设备影子或者数据转发接收属性上报消息。 上报位置信息 对应的消息ID:0x0200 推送样例: { "resource": "device.property", "event": "report", "event_time": "20151212T121212Z", "notify_data": { "header": { "device_id": "00000000000012345678", "product_id": "ABC123456789", "app_id": "d4922d8a-6c8e-4396-852c-164aefa6638f", "gateway_id": "00000000000012345678", "node_id": "00000000000012345678", "tags": [ { "tag_value": "testTagValue", "tag_key": "testTagName" } ] }, "body": { "services": [ { "service_id": "Location", "properties": { "alarm": { "emergencyAlarm": 0, "overSpeedAlarm": 0, "fatigueDrivingAlarm": 0, "dangerousDrivingAlarm": 0, "powerVoltageAlarm": 0, "powerOffAlarm": 0, "cameraErrorAlarm": 0, "overSpeedWarning": 0, "fatigueDrivingWarning": 0, "violationDrivingAlarm": 0, "tirePressureWarning": 0, "rightTurnErrorAlarm": 0, "overDrivingTimeAlarm": 0, "overParkAlarm": 0, "drivingAreaAlarm": 0, "drivingRouteAlarm": 0, "drivingTimeAbnormalAlarm": 0, "offCourseAlarm": 0, "gasolineAlarm": 0, "stolenAlarm": 0, "startingAbnormalAlarm": 0, "displacementAbnormalAlarm": 0, "rolloverAlarm": 0, "rolloverWarning": 0, "icerrorAlarm": 0, "lcderrorAlarm": 0, "ttserrorAlarm": 1, "gnssantennaShortCircuitAlarm": 0, "gnsserrorAlarm": 0, "gnssantennaNotConnectedAlarm": 0, "vssalarm": 0 }, "status": { "accStatus": 0, "positioningStatus": 0, "latitudeStatus": 0, "longitudeStatus": 0, "businessStatus": 0, "encryptedStatus": 0, "forwardCollisionWarningStatus": 0, "laneDeviationAlarmStatus": 0, "loadStatus": 0, "oilStatus": 0, "powerStatus": 1, "doorStatus": 0, "firstDoorStatus": 0, "secondDoorStatus": 0, "thirdDoorStatus": 0, "fourthDoorStatus": 0, "fifthDoorStatus": 0, "galileoStatus": 0, "vehicleStatus": 0, "gpsstatus": 0, "glonassstatus": 0, "bdsstatus": 0 }, "longitude": 116.307629, "latitude": 40.058359, "height": 312, "speed": 3, "direction": 99, "dateTime": "200707192359", "extend": [ ] }, "event_time": "20151212T121212Z" } ] } } } 表1 协议报文字段与属性对应表 起始字节 字段 数据类型 对应属性 0 报警标志 DWORD alarm 4 状态 DWORD status 8 纬度 DWORD longitude 12 经度 DWORD latitude 16 高程 WORD height 18 速度 WORD speed 20 方向 WORD direction 21 时间 BCD[6] dateTime 表2 报警预警标志位与alarm属性对应表 位 定义 对应属性 0 1:紧急报警 emergencyAlarm 1 1:超速报警 overSpeedAlarm 2 1:疲劳驾驶报警 fatigueDrivingAlarm 3 1:危险驾驶行为报警 dangerousDrivingAlarm 4 1:GNSS模块发生故障报警 gnsserrorAlarm 5 1:GNSS天线未接或被剪断报警 gnssantennaNotConnectedAlarm 6 1:GNSS天线短路报警 gnssantennaShortCircuitAlarm 7 1:终端主电源欠压报警 powerVoltageAlarm 8 1:终端主电源掉电报警 powerOffAlarm 9 1:终端LCD或显示器故障报警 lcderrorAlarm 10 1: TTS 模块故障报警 ttserrorAlarm 11 1:摄像头故障报警 cameraErrorAlarm 12 1:道路运输证IC卡模块故障报警 icerrorAlarm 13 1:超速预警 overSpeedWarning 14 1:疲劳驾驶预警 fatigueDrivingWarning 15 1:违规行驶报警 violationDrivingAlarm 16 1:胎压预警 tirePressureWarning 17 1:右转盲区异常报警 rightTurnErrorAlarm 18 1:当天累计驾驶超时报警 overDrivingTimeAlarm 19 1:超时停车报警 overParkAlarm 20 1:进出区域报警 drivingAreaAlarm 21 1:进出路线报警 drivingRouteAlarm 22 1:路段行驶时间不足/过长报警 drivingTimeAbnormalAlarm 23 1:路线偏离报警 offCourseAlarm 24 1:车辆VSS故障 vssalarm 25 1:车辆油量异常报警 gasolineAlarm 26 1:车辆被盗报警(通过车辆防盗器) stolenAlarm 27 1:车辆非法点火报警 startingAbnormalAlarm 28 1:车辆非法位移报警 displacementAbnormalAlarm 29 1:碰撞侧翻报警 rolloverAlarm 30 1:侧翻预警 rolloverWarning 31 保留 - 表3 状态标志位与status属性对应表 位 定义 对应属性 0 0:ACC关;1:ACC开 accStatus 1 0:未定位;1:定位 positioningStatus 2 0:北纬;1:南纬 latitudeStatus 3 0:东经;1:西经 longitudeStatus 4 0:运营状态;1:停运状态 businessStatus 5 0:经纬度未经保密插件加密;1:经纬度已经保密插加密 encryptedStatus 6 1:紧急刹车系统采集的前撞预警 forwardCollisionWarningStatus 7 1:车道偏移预警 laneDeviationAlarmStatus 8~9 00:空车;01:半载;10:保留;11:满载 loadStatus 10 0:车辆油路正常;1:车辆油路断开 oilStatus 11 0:车辆电路正常;1:车辆电路断开 powerStatus 12 0:车门解锁;1:车门加锁 doorStatus 13 0:门1关;1:门1开(前门) firstDoorStatus 14 0:门2关;1:门2开(中门) secondDoorStatus 15 0:门3关;1:门3开(后门) thirdDoorStatus 16 0:门4关;1:门4开(驾驶席门) fourthDoorStatus 17 0:门5关;1:门5开(自定义) fifthDoorStatus 18 0:未使用GPS卫星进行定位;1:使用GPS卫星进行定位 gpsstatus 19 0:未使用北斗卫星进行定位;1:使用北斗卫星进行定位 bdsstatus 20 0:未使用GLONASS卫星进行定位;1:使用GLONASS卫星进行定位 glonassstatus 21 0:未使用Galileo卫星进行定位;1:使用Galileo卫星进行定位 galileoStatus 22 0:车辆处于停止状态;1:车辆处于行驶状态 vehicleStatus 23~31 保留 -
  • 方案架构 方案总体架构框图如下: 图1 总体架构图 各种不同协议类型的设备,通过泛协议插件,最终以MQTT协议和华为云 物联网平台 进行链接。泛协议插件是各种不同协议类型设备和华为云物联网平台之间的中间层,用以屏蔽各种不同协议之间的差异。泛协议插件由三部分组成: “泛协议设备接入层”:用于以特定网络协议与泛协议设备建链。 “协议桥接适配层”:负责完成第三方协议数据和平台格式数据的互相转换。 上行:把第三方协议数据转成平台格式数据,并调用泛协议SDK接口进行上报。 下行:收到平台下行数据时,将平台格式数据转换为第三方协议数据转发给第三方协议设备。 “泛协议SDK”:即平台提供的泛协议接入接入SDK,提供了网桥的通用功能实现。
  • 部署插件 访问 设备接入服务 ,单击“管理控制台 ”进入设备接入控制台。 单击“IoTDA实例 ”,单击具体实例“详情”按钮,选择“泛协议接入”。 图1 泛协议-泛协议接入 选择“云网关”,单击“新增云网关” 。端口填写为“8898”。镜像选择镜像名称为“protocol-plugin-sl651”的镜像。 图2 泛协议-部署成功SL651 防火墙和安全组需要放通8898端口(TCP)。 父主题: SL651协议接入
  • 部署插件 具体操作: 访问设备接入服务,单击“管理控制台 ”进入设备接入控制台。 单击“IoTDA实例 ”,单击具体实例“详情”按钮,选择“泛协议接入”。 图1 泛协议-泛协议接入 选择“云网关”,单击“新增云网关” 。端口填写为“8890”。镜像选择镜像名称为“protocol-plugin-hj212”的镜像。 图2 泛协议-部署成功HJ212 防火墙放通8890端口(TCP)。 父主题: HJ212协议接入
  • 开发流程 DLI 进行Spark Jar作业开发流程参考如下: 图1 Spark Jar作业开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 上传数据到OBS桶 OBS控制台 将测试数据上传到OBS桶下。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码读取OBS数据。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 DLI控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图9 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarObs\target”下名为“SparkJarObs-1.0-SNAPSHOT.jar”。 图10 导出jar包
  • 环境准备 在进行Spark Jar作业开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
  • 步骤2:上传数据到OBS桶 根据如下数据,创建people.json文件。 {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”。 单击“上传对象”,将people.json文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“result”的文件夹。 单击“result”的文件夹,在“result”下单击“新建文件夹”,创建名为“parquet”的文件夹。
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志。 图14 diver日志 图15 “Driver日志”中的作业执行日志 如果作业运行成功,本示例进入OBS桶下的“result/parquet”目录,查看已生成预期的parquet文件。 图16 obs桶文件 如果作业运行失败,单击“操作”列“更多”下的“Driver日志”,显示具体的报错日志信息,根据报错信息定位问题原因。 例如,如下截图信息因为创建Spark Jar作业时主类名没有包含包路径,报找不到类名“SparkDemoObs”。 图17 报错信息 可以在“操作”列,单击“编辑”,修改“主类”参数为正确的:com.huawei.dli.demo.SparkDemoObs,单击“执行”重新运行该作业即可。
  • 语法格式 在Flink jar作业编辑界面,选择配置优化参数,配置信息如下: 不同的OBS桶,使用不同的AKSK认证信息。 可以使用如下配置方式,根据桶指定不同的AKSK信息,参数说明详见表1。 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_ CS MS_KEY flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_C SMS _KEY flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.csms.secretName=CredentialName flink.hadoop.fs.dew.endpoint=ENDPOINT flink.hadoop.fs.dew.csms.version=VERSION_ID flink.hadoop.fs.dew.csms.cache.time.second=CACHE_TIME flink.dli.job.agency.name=USER_AGENCY_NAME
  • 前提条件 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考:创建通用凭据。 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限: DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。 DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。 DEW解密凭据的权限,kms:dek:decrypt。 委托权限示例请参考自定义DLI委托权限和常见场景的委托权限策略。 仅支持Flink1.15版本使用DEW管理访问凭据,在创建作业时,请配置作业使用Flink1.15版本、且已在作业中配置允许DLI访问DEW的委托信息。自定义委托及配置请参考自定义DLI委托权限。 使用该功能,所有涉及OBS的桶,都需要进行配置AKSK。
  • 操作场景 DLI将Flink Jar作业的输出数据写入到OBS时,需要配置AKSK访问OBS,为了确保AKSK数据安全,您可以用过 数据加密 服务(Data Encryption Workshop,DEW)、云凭据管理服务(Cloud Secret Management Service,CSMS),对AKSK统一管理,有效避免程序硬编码或明文配置等问题导致的敏感信息泄露以及权限失控带来的业务风险。 本例以获取访问OBS的AKSK为例介绍Flink Jar使用DEW获取访问凭证读写OBS的操作指导。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 参数说明 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key 是 无 String USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。 参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的AK(Access Key Id),需要具备访问OBS对应桶的权限。 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key 是 无 String USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。 参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的SK(Secret Access Key),需要具备访问OBS对应桶的权限。 flink.hadoop.fs.obs.security.provider 是 无 String OBS AKSK认证机制,使用DEW服务中的CSMS凭证管理,获取OBS的AK、SK。 默认取值为com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.endpoint 是 无 String 指定要使用的DEW服务所在的endpoint信息。 获取地区和终端节点。 配置示例:flink.hadoop.fs.dew.endpoint=kms.cn-xxxx.myhuaweicloud.com flink.hadoop.fs.dew.projectId 否 有 String DEW所在的项目ID, 默认是Flink作业所在的项目ID。 获取项目ID flink.hadoop.fs.dew.csms.secretName 是 无 String 在DEW服务的凭据管理中新建的通用凭据的名称。 配置示例:flink.hadoop.fs.dew.csms.secretName=secretInfo flink.hadoop.fs.dew.csms.version 否 最新的version String 在DEW服务的凭据管理中新建的通用凭据的版本号(凭据的版本标识符)。 若不指定,则默认获取该通用凭证的最新版本号。 配置示例:flink.hadoop.fs.dew.csms.version=v1 flink.hadoop.fs.dew.csms.cache.time.second 否 3600 Long Flink作业访问获取CSMS通用凭证后,缓存的时间。 单位为秒。默认值为3600秒。 flink.dli.job.agency.name 是 - String 自定义委托名称。
  • 步骤4:编写代码 编写DliCatalogTest程序创建数据库、DLI表和OBS表。 完整的样例请参考Java样例代码,样例代码分段说明如下: 导入依赖的包。 import org.apache.spark.sql.SparkSession; 创建SparkSession会话。 创建SparkSession会话时需要指定Spark参数:"spark.sql.session.state.builder"、"spark.sql.catalog.class"和"spark.sql.extensions",按照样例配置即可。 Spark2.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark2.4.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .config("spark.sql.hive.implementation","org.apache.spark.sql.hive.client.DliHiveClientImpl") .appName("java_spark_demo") .getOrCreate(); Spark3.1.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark3.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.DliLakeHouseBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.DliLakeHouseCatalog") .appName("java_spark_demo") .getOrCreate(); 创建数据库。 如下样例代码演示,创建名为test_sparkapp的数据库。 spark.sql("create database if not exists test_sparkapp").collect(); 创建DLI表并插入测试数据。 spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); 创建OBS表。如下示例中的OBS路径需要根据步骤2:OBS桶文件配置中的实际数据路径修改。 spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); 关闭SparkSession会话spark。 spark.stop();
  • 步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图9 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarMetadata\target”下名为“SparkJarMetadata-1.0-SNAPSHOT.jar”。 图10 导出jar包
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。 可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。 图14 查看创建的数据库 双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。 图15 查看表 双击DLI表名dli_testtable,单击“执行”查询DLI表数据。 图16 查询DLI表数据 注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。 图17 查询OBS表数据 如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。 图18 查看Driver日志 原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。
  • Java样例代码 本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下: package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }
  • scala样例代码 object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class","org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }
  • Python样例代码 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
  • 步骤2:OBS桶文件配置 如果需要创建OBS表,则需要先上传数据到OBS桶目录下。 本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。 12,Michael 27,Andy 30,Justin 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”。 单击“上传对象”,将testdata.csv文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。
  • 开发流程 DLI进行Spark作业访问DLI元数据开发流程参考如下: 图1 Spark作业访问DLI元数据开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 OBS桶文件配置 OBS控制台 如果是创建OBS表,则需要上传文件数据到OBS桶下。 配置Spark创建表的元数据信息的存储路径。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码创建DLI表或OBS表。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 DLI控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 约束限制 如果使用Spark 3.1访问元数据,则必须新建队列。 不支持的场景: 在SQL作业中创建了数据库(database),编写程序代码指定在该数据库下创建表。 例如在DLI的SQL编辑器中的某SQL队列下,创建了数据库testdb。后续通过编写程序代码在testdb下创建表testTable,编译打包后提交的Spark Jar作业则会运行失败。 支持的场景 在SQL作业中创建数据库(database),表(table) , 通过SQL或Spark程序作业读取插入数据。 在Spark程序作业中创建数据库(database),表(table), 通过SQL或Spark程序作业读取插入数据。
  • 环境准备 在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
  • 样例代码 本章节JAVA样例代码演示将DataGen数据处理后写入到OBS,具体参数配置请根据实际环境修改。 创建DLI访问DEW的委托并完成委托授权。 详细步骤请参考自定义DLI委托权限。 在DEW创建通用凭证。详细操作请参考创建通用凭据。 登录DEW管理控制台 选择“凭据管理”,进入“凭据管理”页面。 单击“创建凭据”。配置凭据基本信息 DLI Spark jar作业编辑界面设置作业参数。 Spark参数: spark.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key= USER_AK_CSMS_KEY spark.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key= USER_SK_CSMS_KEY spark.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider spark.hadoop.fs.dew.csms.secretName=obsAkSk spark.hadoop.fs.dew.endpoint=kmsendpoint spark.hadoop.fs.dew.csms.version=v3 spark.dli.job.agency.name=agency 示例代码 示例代码请参考使用Spark Jar作业读取和查询OBS数据。
共100000条