华为云用户手册

  • IP地理函数 当前仅支持IPV4的IP地址。 表6 IP地理函数表 函数 返回值 说明 IP_TO_COUNTRY STRING 获取IP地址所在的国家名称。 IP_TO_PROVINCE STRING 获取IP地址所在的省份。 用法说明: IP_TO_PROVINCE(STRING ip):返回IP地址所在的省份。 IP_TO_PROVINCE(STRING ip, STRING lang):以指定语言返回IP地址所在的省份。 说明: 当IP无法被解析到省份时,返回该IP所属的国家。当IP无法被解析时,返回“未知”。 函数返回的省份名称均为简称。 中文参考如下链接:http://www.gov.cn/guoqing/2005-09/13/content_5043917.htm IP_TO_CITY STRING 获取IP地址所在的城市名称。 说明: 当IP无法被解析到城市时,返回该IP所属的省份或者国家。当IP无法被解析时,返回“未知”。 IP_TO_CITY_GEO STRING 获取IP地址所在城市的经纬度,格式为“纬度,经度”。 用法说明: IP_TO_CITY_GEO(STRING ip):返回IP所在城市的经纬度。
  • 示例 偏航检测样例: 1 2 3 4 INSERT INTO yaw_warning SELECT "The car is yawing" FROM driver_behavior WHERE NOT ST_WITHIN(ST_POINT(cast(Longitude as DOUBLE), cast(Latitude as DOUBLE)), ST_BUFFER(ST_LINE(ARRAY[ST_POINT(34.585555,105.725221),ST_POINT(34.586729,105.735974),ST_POINT(34.586492,105.740538),ST_POINT(34.586388,105.741651),ST_POINT(34.586135,105.748712),ST_POINT(34.588691,105.74997)]),0.001));
  • 函数说明 基本地理空间几何元素介绍说明如表1所示。 表1 基本地理空间几何元素表 地理空间几何元素(统称geometry) 说明 举例 ST_POINT(latitude, longitude) 地理点,包含经度和维度两个信息。 ST_POINT(1.12012, 1.23401) ST_LINE(array[point1...pointN]) 地理线,由多个地理点(ST_POINT)按顺序连接成的折线或直线。 ST_LINE(ARRAY[ST_POINT(1.12, 2.23), ST_POINT(1.13, 2.44), ST_POINT(1.13, 2.44)]) ST_POLYGON(array[point1...point1]) 地理多边形,由首尾相同的多个地理点(ST_POINT)按顺序连线围成的封闭多边形区域。 ST_POLYGON(ARRAY[ST_POINT(1.0, 1.0), ST_POINT(2.0, 1.0), ST_POINT(2.0, 2.0), ST_POINT(1.0, 1.0)]) ST_CIRCLE(point, radius) 地理圆形,由圆心地理点(ST_POINT)和半径构成的地理圆形区域。 ST_CIRCLE(ST_POINT(1.0, 1.0), 1.234) 用户可以以基本地理空间几何元素为基础,构造复杂的地理空间几何元素,具体的变换方法见表2。 表2 基于基本地理空间几何元素构造复杂几何元素的变换表 变换方法 说明 举例 ST_BUFFER(geometry, distance) 创建一个环绕包含给定地理空间几何元素的多边形,并以给定距离作为环绕距离,通常使用该函数构造一定宽度的公路范围用于偏航检测。 ST_BUFFER(ST_LINE(ARRAY[ST_POINT(1.12, 2.23), ST_POINT(1.13, 2.44), ST_POINT(1.13, 2.44)]),1.0) ST_INTERSECTION(geometry, geometry) 创建一个多边形,其范围为给定的两个地理空间几何元素的交叠区域。 ST_INTERSECTION(ST_CIRCLE(ST_POINT(1.0, 1.0), 2.0), ST_CIRCLE(ST_POINT(3.0, 1.0), 1.234)) ST_ENVELOPE(geometry) 创建一个包含给定的地理空间几何元素的最小矩形。 ST_ENVELOPE(ST_CIRCLE(ST_POINT(1.0, 1.0), 2.0)) DLI 提供丰富的对地理空间几何元素的操作和位置判断函数,具体的SQL标量函数介绍说明见表3。 表3 SQL标量函数表 函数 返回值 说明 ST_DISTANCE(point_1, point_2) DOUBLE 计算两个地理点之间的欧几里得距离。 示例如下: Select ST_DISTANCE(ST_POINT(x1, y1), ST_POINT(x2, y2)) FROM input ST_GEODESIC_DISTANCE(point_1, point_2) DOUBLE 计算两个地理点之间的测地距离,即两个地理点之间地表最短路径距离。 示例如下: Select ST_GEODESIC_DISTANCE(ST_POINT(x1, y1), ST_POINT(x2, y2)) FROM input ST_PERIMETER(polygon) DOUBLE 计算多边形的周长。 示例如下: Select ST_PERIMETER(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]) FROM input ST_AREA(polygon) DOUBLE 计算多边形区域的面积。 示例如下: Select ST_AREA(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]) FROM input ST_OVERLAPS(polygon_1, polygon_2) BOOLEAN 判断一个多边形是否与另一个多边形有重叠区域。 示例如下: SELECT ST_OVERLAPS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_INTERSECT(line1, line2) BOOLEAN 检查两条线段是否相互交叉,而非线条所在的直线是否交叉。 示例如下: SELECT ST_INTERSECT(ST_LINE(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12)]), ST_LINE(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23)])) FROM input ST_WITHIN(point, polygon) BOOLEAN 一个点是否包含在几何体(多边形或圆形)内。 示例如下: SELECT ST_WITHIN(ST_POINT(x11, y11), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_CONTAINS(polygon_1, polygon_2) BOOLEAN 判断第一个几何体是否包含第二个几何体。 示例如下: SELECT ST_CONTAINS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_COVERS(polygon_1, polygon_2) BOOLEAN 第一个几何体是否覆盖第二个几何体。与ST_CONTAINS相似,但在边界重叠情况下ST_COVER判断为TRUE,ST_CONTAINS判断为FALSE。 示例如下: SELECT ST_COVERS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON([ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_DISJOINT(polygon_1, polygon_2) BOOLEAN 判断一个多边形是否与另一个多边形不相交(不重叠)。 示例如下: SELECT ST_DISJOINT(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input 地理函数的基准坐标系标准为全球通用的GPS坐标系标准WGS84,GPS坐标不能直接在百度地图(BD09标准)或者google地图(GCJ02标准)上使用,会有偏移现象,为了在不同地理坐标系之间切换,DLI提供了坐标系转换的一系列函数,并且还提供地理距离与米之间的转换函数。详见表4。 表4 地理坐标系转换函数与距离单位转换函数表 函数 返回值 说明 WGS84_TO_BD09(geometry) 对应的百度地图坐标系地理空间几何元素 将GPS坐标系下的地理空间几何元素转换成百度地图坐标系下对应的地理空间几何元素。示例如下: WGS84_TO_BD09(ST_CIRCLE(ST_POINT(x, y), r)) WGS84_TO_CJ02(geometry) 对应的Google地图坐标系地理空间几何元素 将GPS坐标系下的地理空间几何元素转换成Google地图坐标系下对应的地理空间几何元素。示例如下: WGS84_TO_CJ02(ST_CIRCLE(ST_POINT(x, y), r)) BD09_TO_WGS84(geometry) 对应的GPS坐标系地理空间几何元素 将百度地图坐标系下的地理空间几何元素转换成GPS坐标系下对应的地理空间几何元素。示例如下: BD09_TO_WGS84(ST_CIRCLE(ST_POINT(x, y), r)) BD09_TO_CJ02(geometry) 对应的Google地图坐标系地理空间几何元素 将百度地图坐标系下的地理空间几何元素转换成Google地图坐标系下对应的地理空间几何元素。示例如下: BD09_TO_CJ02(ST_CIRCLE(ST_POINT(x, y), r)) CJ02_TO_WGS84(geometry) 对应的GPS坐标系地理空间几何元素 将Google地图坐标系下的地理空间几何元素转换成GPS坐标系下对应的地理空间几何元素。示例如下: CJ02_TO_WGS84(ST_CIRCLE(ST_POINT(x, y), r)) CJ02_TO_BD09(geometry) 对应的百度地图坐标系地理空间几何元素 将Google地图坐标系下的地理空间几何元素转换成百度地图坐标系下对应的地理空间几何元素。示例如下: CJ02_TO_BD09(ST_CIRCLE(ST_POINT(x, y), r)) DEGREE_TO_METER(distance) DOUBLE 将地理函数的距离数值转换成以“米”为单位的数值。示例如下(以米为单位计算地理三边形周长): DEGREE_TO_METER(ST_PERIMETER(ST_POLYGON(ARRAY[ST_POINT(x1,y1), ST_POINT(x2,y2), ST_POINT(x3,y3), ST_POINT(x1,y1)]))) METER_TO_DEGREE(numerical_value) DOUBLE 将以“米”为单位的数值转换成地理函数可计算的距离单位数值。示例如下(画出以指定地理点为圆心,半径1公里的圆): ST_CIRCLE(ST_POINT(x,y), METER_TO_DEGREE(1000)) DLI还提供了基于窗口的SQL地理聚合函数用于SQL逻辑涉及窗口和聚合的场景。详见表5的介绍说明。 表5 时间相关SQL地理聚合函数表 函数 说明 举例 AGG_DISTANCE(point) 距离聚合函数,用于计算窗口内所有相邻地理点的距离总和。 SELECT AGG_DISTANCE(ST_POINT(x,y)) FROM input GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY) AVG_SPEED(point) 平均速度聚合函数,用于计算窗口内所有地理点组成的移动轨迹的平均速度,单位为“米/秒”。 SELECT AVG_SPEED(ST_POINT(x,y)) FROM input GROUP BY TUMBLE(proctime, INTERVAL '1' DAY)
  • 语法格式 1 2 3 4 5 6 7 create table printSink ( attr_name attr_type (',' attr_name attr_type) * (',' PRIMARY KEY (attr_name,...) NOT ENFORCED) ) with ( 'connector' = 'print', 'print-identifier' = '', 'standard-error' = '' );
  • 参数说明 表1 禁止或恢复生命周期参数说明 参数名称 是否必选 参数说明 table_name 是 待禁止或恢复生命周期的表的名称。 pt_spec 否 待禁止或恢复生命周期的表的分区信息。格式为partition_col1=col1_value1, partition_col2=col2_value1...。对于有多级分区的表,必须指明全部的分区值。 enable 否 恢复表或指定分区的生命周期功能 表及其分区重新参与生命周期回收,默认使用当前表及分区上的生命周期配置。 开启表生命周期前可以修改表及分区的生命周期配置,防止开启表生命周期后因使用之前的配置导致数据被误回收。 disable 否 禁止表或指定分区的生命周期功能。 禁止表本身及其所有分区被生命周期回收,优先级高于恢复表分区生命周期。即当使用禁止表或指定分区的生命周期功能时,设置待禁止或恢复生命周期的表的分区信息是无效的。 禁止表的生命周期功能后,表的生命周期配置及其分区的enable和disable标记会被保留。 禁止表的生命周期功能后,仍然可以修改表及分区表的生命周期配置。
  • 示例 示例1:禁止表test_lifecycle的生命周期功能。 1 alter table test_lifecycle SET TBLPROPERTIES("dli.table.lifecycle.status"='disable'); 示例2:禁止表test_lifecycle中时间为20230520分区的生命周期功能。 1 alter table test_lifecycle partition (dt='20230520') LIFECYCLE 'disable'; 当设置禁止分区表的生命周期功能后,该表的所有分区的生命周期功能都会被禁止。
  • 示例 右外连接和左外连接相似,但是会将右边表(这里的course_info)中的所有记录返回,没有匹配值的左表记录将返回NULL。 1 2 SELECT student_info.name, course_info.courseName FROM student_info RIGHT OUTER JOIN course_info ON (student_info.courseId = course_info.courseId);
  • 功能描述 DLI提供多版本功能,用于数据的备份与恢复。开启多版本功能后,在进行删除或修改表数据时(insert overwrite或者truncate操作),系统会自动备份历史数据并保留一定时间,后续您可以对保留周期内的数据进行快速恢复,避免因误操作丢失数据。其他多版本SQL语法请参考多版本备份恢复数据。 DLI数据多版本功能当前仅支持通过Hive语法创建的OBS表,具体建表语法可以参考使用Hive语法创建OBS表。
  • 示例 修改表test_table,开启多版本功能。 1 2 ALTER TABLE test_table SET TBLPROPERTIES ("dli.multi.version.enable"="true"); 修改表test_table,关闭多版本功能。 1 2 ALTER TABLE test_table UNSET TBLPROPERTIES ("dli.multi.version.enable"); 回退多版本路径。 RESTORE TABLE test_table TO initial layout;
  • 语法格式 开启多版本功能 ALTER TABLE [db_name.]table_name SET TBLPROPERTIES ("dli.multi.version.enable"="true"); 关闭多版本功能 1 2 ALTER TABLE [db_name.]table_name UNSET TBLPROPERTIES ("dli.multi.version.enable"); 开启多版本功能后,在执行insert overwrite或者truncate操作时会自动在OBS存储路径下存储多版本数据。关闭多版本功能后,需要通过如下命令把多版本数据目录回收。 RESTORE TABLE [db_name.]table_name TO initial layout;
  • 示例 从Kafka名称为test的topic中读取数据。 1 2 3 4 5 6 7 8 9 10 11 CREATE SOURCE STREAM kafka_source ( name STRING, age int ) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json" ); 从Kafka读取对象为test的topic,使用json_config将json数据和表字段对应。 数据编码格式为json且不含嵌套,例如: {"attr1": "lilei", "attr2": 18} 建表语句参考如下: 1 2 3 4 5 6 7 8 9 CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json", json_config = "name=attr1;age=attr2" );
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP 域名 映射,请参见《 数据湖探索 用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "kafka", kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“Kafka”表示数据源。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_group_id 否 group id。 kafka_topic 是 读取的Kafka的topic。目前只支持读取单个topic。 encode 是 数据编码格式,可选为“csv”、“json”、“blob”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 当编码格式为"blob"时,表示不对接收的数据进行解析,流属性仅能有一个且为Array[TINYINT]类型。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。 说明: 请确保在DLI队列host文件中添加 MRS 集群master节点的“/etc/hosts”信息。 json_config 否 当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系。 格式:"field1=json_field1;field2=json_field2" 格式说明:field1、field2为创建的表字段名称。json_field1、json_field2为kafka输入数据json串的key字段名称。 具体使用方法可以参考示例说明。 field_delimiter 否 当encode为csv时,用于指定csv字段分隔符,默认为逗号。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前仅适用于 CS V格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 start_time 否 kafka数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 语法格式 1 2 3 4 5 6 7 8 9 create table printSink ( attr_name attr_type (',' attr_name attr_type) * (',' PRIMARY KEY (attr_name,...) NOT ENFORCED) ) with ( 'connector' = 'print', 'print-identifier' = '', 'standard-error' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认参数 数据类型 说明 connector 是 无 String 固定为:print。 print-identifier 否 无 String 配置一个标识符作为输出数据的前缀。 standard-error 否 false Boolean 该值只能为true或false,默认为false。 若为true,则表示输出数据到taskmanager的error文件中。 若为false,则表示输出数据到taskmanager的out中。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'connector.properties.group.id' = '', 'connector.startup-mode' = '', 'format.type' = '' );
  • 示例 从Kafka中读取编码格式为csv,对象为kafkaSource的表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'csv' ); 从Kafka中读取编码格式为不含嵌套的json数据,对象为kafkaSource的表。 例如不含嵌套的json数据格式为: {"car_id": 312, "car_owner": "wang", "car_brand": "tang"} {"car_id": 313, "car_owner": "li", "car_brand": "lin"} {"car_id": 314, "car_owner": "zhao", "car_brand": "han"} 则创建表语句为: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); 从Kafka中读取编码格式包含嵌套的json数据,对象为kafkaSource的表。 例如包含嵌套的json数据格式为: { "id":"1", "type":"online", "data":{ "patient_id":1234, "name":"bob1234", "age":"Bob", "gmt_create":"Bob", "gmt_modify":"Bob" } } 则创建表语句为: CREATE table kafkaSource( id STRING, type STRING, data ROW( patient_id STRING, name STRING, age STRING, gmt_create STRING, gmt_modify STRING) ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于kafka,需配置为'kafka'。 connector.version 是 Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 format.type 是 数据反序列化格式,支持:'csv', 'json'及'avro'等。 format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.topic 是 kafka topic名。该参数和“connector.topic-pattern”两个参数只能使用其中一个。 connector.topic-pattern 否 匹配读取kafka topic名称的正则表达式。该参数和“connector.topic”两个参数只能使用其中一个。 例如: 'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔。 connector.properties.group.id 否 消费组名称 connector.startup-mode 否 consumer启动模式,支持:'earliest-offset', 'latest-offset', 'group-offsets', 'specific-offsets'及'timestamp'。默认值为'group-offsets'。 connector.specific-offsets 否 指定消费offset,'startup-mode'为'specific-offsets'时需配置,格式为: 'partition:0,offset:42;partition:1,offset:300'。 connector.startup-timestamp-millis 否 指定起始消费时间戳,'startup-mode'为'timestamp'时需配置。 connector.properties.* 否 配置kafka任意原生属性。
  • 示例 CSV编码格式:数据输出到DIS通道,使用csv编码,并且以逗号为分隔符,多个分区用car_owner做为key进行分发。数据输出示例:"ZJA710XC", "lilei", "BMW", 700000。 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "xxx", channel = "dlioutput", encode = "csv", field_delimiter = "," ); JSON编码格式:数据输出到DIS通道,使用json编码,多个分区用car_owner,car_brand 做为key进行分发,“enableOutputNull”为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。数据示例:"car_id ":"ZJA710XC", "car_owner ":"lilei", "car_brand ":"BMW", "car_price ":700000。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", channel = "dlioutput", region = "xxx", partition_key = "car_owner,car_brand", encode = "json", enable_output_null = "false" );
  • 功能描述 DLI将Flink作业的输出数据写入 数据接入服务 (DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dis表示输出到数据接入服务。 region 是 数据所在的DIS所在区域。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 channel 是 DIS通道。 partition_key 否 数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。 encode 是 数据编码格式,可选为“csv”、“json”和“user_defined”。 说明: 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需使用“enable_output_null”来配置是否输出空字段,具体见示例。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 field_delimiter 是 属性分隔符。 当编码格式为csv时,需要设置属性分隔符,用户可以自定义,如:“,”。 当编码格式为json时,则不需要设置属性之间的分隔符。 json_config 否 当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2”。 enable_output_null 否 当编码格式为json时,需使用该参数来配置是否输出空字段。 当该参数为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。默认值为“true”。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_key = "", encode= "", field_delimiter= "" );
  • 示例 将流disSink的数据输出到DIS中。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table disSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 connector.region 是 数据所在的DIS区域。 connector.ak 否 访问密钥ID(Access Key ID),需与sk同时设置 connector.sk 否 Secret Access Key,需与ak同时设置 connector.channel 是 数据所在的DIS通道名称。 format.type 是 数据编码格式,可选为“csv”、“json” format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.partition-key 否 数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table disSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format.type' = '' );
  • 功能描述 DLI将Flink作业的输出数据写入数据接入服务(DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 示例 给用户user_name1授予数据库db1的删除数据库权限。 1 GRANT DROP_DATABASE ON databases.db1 TO USER user_name1; 给用户user_name1授予数据库db1的表tb1的SELECT权限。 1 GRANT SELECT ON databases.db1.tables.tb1 TO USER user_name1; 给角色role_name授予数据库db1的表tb1的SELECT权限。 1 GRANT SELECT ON databases.db1.tables.tb1 TO ROLE role_name;
  • 注意事项 privilege必须是可授权限中的一种。且如果赋权对象在resource或上一级resource上已经有对应权限时,则会赋权失败。Privilege支持的权限类型可参见数据权限列表。 resource可以是queue、database、table、view、column,格式分别为: queue的格式为:queues.queue_name queue支持的Privilege权限类型可以参考下表: 操作 说明 DROP_QUEUE 删除队列 SUBMIT_JOB 提交作业 CANCEL_JOB 终止作业 RESTART 重启队列 SCALE_QUEUE 扩缩容队列 GRANT_PRIVILEGE 队列的赋权 REVOKE_PRIVILEGE 队列权限的回收 SHOW_PRIVILEGES 查看其他用户具备的队列权限 database的格式为:databases.db_name database支持的Privilege权限类型可参见数据权限列表。 table的格式为:databases.db_name.tables.table_name table支持的Privilege权限类型可参见数据权限列表。 view的格式为:databases.db_name.tables.view_name view支持的Privilege权限类型和table一样,具体可以参考数据权限列表中table的权限列表描述。 column的格式为:databases.db_name.tables.table_name.columns.column_name column支持的Privilege权限类型仅为:SELECT
  • 关键字 ROLLUP:为GROUP BY的扩展,例如:SELECT a, b, c, SUM(expression) FROM table GROUP BY a, b, c WITH ROLLUP;将转换成以下四条查询: (a, b, c)组合小计 1 2 SELECT a, b, c, sum(expression) FROM table GROUP BY a, b, c; (a, b)组合小计 1 2 SELECT a, b, NULL, sum(expression) FROM table GROUP BY a, b; (a)组合小计 1 2 SELECT a, NULL, NULL, sum(expression) FROM table GROUP BY a; 总计 1 SELECT NULL, NULL, NULL, sum(expression) FROM table;
共100000条