华为云用户手册

  • 参数说明 表1 参数描述 参数 描述 db_name Database名称,由字母、数字和下划线(_)组成。不能是纯数字,且不能以下划线开头。 table_name Database中的表名,由字母、数字和下划线(_)组成。不能是纯数字,且不能以下划线开头。 匹配规则为:^(?!_)(?![0-9]+$)[A-Za-z0-9_$]*$,如果特殊字符需要使用单引号('')包围起来。 partition_specs 分区信息,key=value形式,key为分区字段,value为分区值。若分区字段为多个字段,可以不包含所有的字段,会显示匹配上的所有分区信息。
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部联接当前仅支持文本常量 TRUE 作为谓词。 示例 若表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 若表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • 示例 将流jdbcSink的数据输出到MySQL数据库中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table jdbcSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxxxxx' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc connector.url 是 数据库的URL connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取 connector.username 否 访问数据库所需要的账号 connector.password 否 访问数据库所需要的密码 connector.write.flush.max-rows 否 写数据时,刷新数据的最大行数。默认值为5000 connector.write.flush.interval 否 刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等。不填写则默认不根据时间刷新 connector.write.max-retries 否 写数据失败时的最大尝试次数。默认值为3 connector.write.exclude-update-columns 否 默认值为空(默认忽略primary key字段),表示更新主键值相同的数据时,忽略指定字段的更新
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table jdbcSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = '', 'connector.driver' = '', 'connector.username' = '', 'connector.password' = '' );
  • 注意事项 导入OBS表时,创建OBS表时指定的路径必须是文件夹,若建表路径是文件将导致导入数据失败。 仅支持导入位于OBS路径上的原始数据。 不建议对同一张表并发导入数据,因为有一定概率发生并发冲突,导致导入失败。 导入数据时只能指定一个路径,路径中不能包含逗号。 当OBS桶目录下有文件夹和文件同名时,导入数据会优先指向该路径下的文件而非文件夹。 导入PARQUET、ORC及JSON类型数据时,必须指定DATA_TYPE这一OPTIONS,否则会以默认的“ CS V”格式进行解析,从而导致导入的数据格式不正确。 导入CSV及JSON类型数据时,如果包含日期及时间列,需要指定DATEFORMAT及TIMESTAMPFORMAT选项,否则将以默认的日期及时间戳格式进行解析。
  • 示例 导入数据前已参考创建OBS表或者创建 DLI 表中的示例描述创建对应的表。 可使用下列语句将CSV文件导入到DLI表,“t”为表名。 1 2 LOAD DATA INPATH 'obs://dli/data.csv' INTO TABLE t OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','COMMENTCHAR'='#','HEADER'='false'); 可使用下列语句将JSON文件导入到DLI表,“jsontb”为表名。 1 2 LOAD DATA INPATH 'obs://dli/alltype.json' into table jsontb OPTIONS('DATA_TYPE'='json','DATEFORMAT'='yyyy/MM/dd','TIMESTAMPFORMAT'='yyyy/MM/dd HH:mm:ss');
  • 参数说明 表1 参数描述 参数 描述 folder_path 原始数据文件夹或者文件的OBS路径。 db_name 数据库名称。若未指定,则使用当前数据库。 table_name 需要导入数据的DLI表的名称。 以下是可以在导入数据时使用的配置选项: DATA_TYPE: 指定导入的数据类型,当前支持CSV、Parquet、ORC、JSON、Avro类型,默认值为“CSV”。 配置项为OPTIONS('DATA_TYPE'='CSV') 导入CSV和JSON文件时,有三种模式可以选择: PERMISSIVE:选择PERMISSIVE模式时,如果某一列数据类型与目标表列数据类型不匹配,则该行数据将被设置为null。 DROPMALFORMED:选择DROPMALFORMED模式时,如果某一列数据类型与目标表列数据类型不匹配,则不导入该行数据。 FAILFAST:选择FAILFAST模式时,如果某一列类型不匹配,则会抛出异常,导入失败。 模式设置可通过在OPTIONS中添加 OPTIONS('MODE'='PERMISSIVE')进行设置。 DELIMITER:可以在导入命令中指定分隔符,默认值为“,”。 配置项为OPTIONS('DELIMITER'=',')。 对于CSV数据,支持如下所述分隔符: 制表符tab,例如:'DELIMITER'='\t'。 任意的二进制字符,例如:'DELIMITER'='\u0001(^A)'。 单引号('),单引号必须在双引号(" ")内。例如:'DELIMITER'= "'"。 DLI表还支持\001(^A)和\017(^Q),例如:'DELIMITER'='\001(^A)','DELIMITER'='\017(^Q)'。 QUOTECHAR:可以在导入命令中指定引号字符。默认值为"。 配置项为OPTIONS('QUOTECHAR'='"') COMMENTCHAR:可以在导入命令中指定注释字符。在导入操作期间,如果在行的开头遇到注释字符,那么该行将被视为注释,并且不会被导入。默认值为#。 配置项为OPTIONS('COMMENTCHAR'='#') HEADER:用来表示源文件是否有表头。取值范围为“true”和“false”。“true”表示有表头,“false”表示无表头。默认值为“false”。如果没有表头,可以在导入命令中指定FILEHEADER参数提供表头。 配置项为OPTIONS('HEADER'='true') FILEHEADER:如果源文件中没有表头,可在LOAD DATA命令中提供表头。 OPTIONS('FILEHEADER'='column1,column2') ESCAPECHAR:如果用户想在CSV上对Escape字符进行严格验证,可以提供Escape字符。默认值为“\\”。 配置项为OPTIONS('ESCAPECHAR'='\\') 如果在CSV数据中输入ESCAPECHAR,该ESCAPECHAR必须在双引号(" ")内。例如:"a\b"。 MAXCOLUMNS:该可选参数指定了在一行中,CSV解析器解析的最大列数。 配置项为OPTIONS('MAXCOLUMNS'='400') 表2 MAXCOLUMNS 可选参数名称 默认值 最大值 MAXCOLUMNS 2000 20000 设置MAXCOLUMNS Option的值后,导入数据会对executor的内存有要求,所以导入数据可能会由于executor内存不足而失败。 DATEFORMAT:指定列的日期格式。 OPTIONS('DATEFORMAT'='dateFormat') 默认值为:yyyy-MM-dd。 日期格式由Java的日期模式字符串指定。在Java的日期和时间模式字符串中,未加单引号(')的字符'A' 到'Z' 和'a' 到'z' 被解释为模式字符,用来表示日期或时间字符串元素。若模式字符使用单引号 (') 引起来,则在解析时只进行文本匹配,而不进行解析。Java模式字符定义请参见表3。 表3 日期及时间模式字符定义 模式字符 日期或时间元素 示例 G 纪元标识符 AD y 年份 1996; 96 M 月份 July; Jul; 07 w 年中的周数 27(该年的第27周) W 月中的周数 2(该月的第2周) D 年中的天数 189(该年的第189天) d 月中的天数 10(该月的第10天) u 星期中的天数 1 = 星期一, ..., 7 = 星期日 a am/pm 标记 pm(下午时) H 24小时数(0-23) 2 h 12小时数(1-12) 12 m 分钟数 30 s 秒数 55 S 毫秒数 978 z 时区 Pacific Standard Time; PST; GMT-08:00 TIMESTAMPFORMAT:指定列的时间戳格式。 OPTIONS('TIMESTAMPFORMAT'='timestampFormat') 默认值为:yyyy-MM-dd HH:mm:ss。 时间戳格式由Java的时间模式字符串指定。Java时间模式字符串定义详见表3 日期及时间模式字符定义。
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...]; 覆盖插入数据 1 2 3 4 5 6 7 INSERT OVERWRITE TABLE DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...;
  • 关键字 PARTITION BY:可以用一个或多个键分区。和GROUP BY子句类似,PARTITION BY将表按分区键分区,每个分区是一个窗口,窗口函数作用于各个分区。单表分区数最多允许7000个。 ORDER BY:决定窗口函数求值的顺序。可以用一个或多个键排序。通过ASC或DESC决定升序或降序。窗口由WINDOW子句指定。如果不指定,默认窗口等同于ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,即窗口从表或分区(如果OVER子句中用PARTITION BY分区)的初始处到当前行。 WINDOW:通过指定一个行区间来定义窗口。 CURRENT ROW:表示当前行。 num PRECEDING:定义窗口的下限,即窗口从当前行向前数num行处开始。 UNBOUNDED PRECEDING:表示窗口没有下限。 num FOLLOWING:定义窗口的上限,即窗口从当前行向后数num行处结束。 UNBOUNDED FOLLOWING:表示窗口没有上限。 ROWS BETWEEN…和RANGE BETWEEN…的区别: ROW为物理窗口,即根据ORDER BY子句排序后,取前N行及后N行的数据计算(与当前行的值无关,只与排序后的行号相关)。 RANGE为逻辑窗口,即指定当前行对应值的范围取值,列数不固定,只要行值在范围内,对应列都包含在内。 窗口有以下多种场景,如 窗口只包含当前行。 1 ROWS BETWEEN CURRENT ROW AND CURRENT ROW 窗口从当前行向前数3行开始,到当前行向后数5行结束。 1 ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING 窗口从表或分区的开头开始,到当前行结束。 1 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 窗口从当前行开始,到表或分区的结尾结束。 1 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING 窗口从表或分区的开头开始,到表或分区的结尾结束。 1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
  • 语法格式 1 2 3 4 5 SELECT window_func(args) OVER ([PARTITION BY col_name, col_name, ...] [ORDER BY col_name, col_name, ...] [ROWS | RANGE BETWEEN (CURRENT ROW | (UNBOUNDED |[num]) PRECEDING) AND (CURRENT ROW | ( UNBOUNDED | [num]) FOLLOWING)]);
  • 示例代码 计算所有商品库存(items)的样本偏差。命令示例如下: select stddev_samp(items) from warehouse; 返回结果如下: +------------+ | _c0 | +------------+ | 1.342355 | +------------+ 与group by配合使用,对所有商品按照仓库(warehourseId)进行分组,并计算同组商品库存(items)的样本偏差。命令示例如下: select warehourseId, stddev_samp(items) from warehourse group by warehourseId; 返回结果如下: +------------+------------+ | warehouseId| _c1 | +------------+------------+ | city1 | 1.23124 | | city2 | 1.23344 | | city3 | 1.43425 | +------------+------------+
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...]; 覆盖插入数据 1 2 3 4 5 6 7 INSERT OVERWRITE TABLE DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...;
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,"kafka"表示输出到Kafka中。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_topic 是 写入的topic encode 是 数据编码格式,可选为“csv”、“json”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 filed_delimiter 否 当encode为csv时,用于指定各字段分隔符,默认为逗号。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP 域名 映射,请参见《 数据湖探索 用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" )
  • 示例 将流kafka_sink的数据输出到Kafka中。 1 2 3 4 5 6 7 CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,rds表示输出到关系型数据库中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址,格式为:"{database_type}://ip:port/database" 目前支持两种数据库连接:MySQL和PostgreSQL MySQL: 'mysql://ip:port/database' PostgreSQL: 'postgresql://ip:port/database' 说明: 将数据库连接地址设置为DWS数据库地址,即可创建DWS维表。DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。 table_name 是 用于查询数据的数据库表名。 db_columns 否 流属性和数据库表的字段对应关系。当sink流中流属性字段和数据库表中的流属性字段不完全匹配时,该参数必配。格式为“dbtable_attr1,dbtable_attr2,dbtable_attr3“。 cache_max_num 否 表示最大缓存的查询结果数,默认值为32768。 cache_time 否 表示数据库查询结果在内存中缓存的最大时间。单位为毫秒,默认值为10000,当值为0时表示不缓存。
  • 示例 RDS表用于与输入流连接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dliinput", encode = "csv", field_delimiter = "," ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "******", db_url = "postgresql://192.168.0.0:2000/test1", table_name = "car" ); CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dlioutput", partition_key = "car_owner", encode = "csv", field_delimiter = "," ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info as b on a.car_id = b.car_id; 将数据库连接地址设置为DWS数据库地址,即可创建DWS维表。DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。
  • 前提条件 请务必确保您的账户下已在关系型数据库(RDS)里创建了PostgreSQL或MySQL类型的RDS实例。 如何创建RDS实例,请参见《关系型数据库快速入门》中“购买实例”章节。 该场景作业需要运行在DLI的独享队列上,因此要与RDS实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE table_id ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "", password = "", db_url = "", table_name = "" );
  • 关键字 表1 CREATE TABLE关键字说明 参数 描述 url Oracle的连接地址。 Oracle url支持以下格式: 格式一:jdbc:oracle:thin:@host:port:SID,其中SID是oracle数据库的唯一标识符。 格式二:jdbc:oracle:thin:@//host:port/service_name;这种方式是Oracle推荐的,对于集群来说,每个节点的SID可能不一致,但ServiceName是一致的,包含所有节点。 driver Oracle驱动类名: oracle.jdbc.driver.OracleDriver dbtable 指定在Oracle关联的表名,或者"用户名.表名",例如:public.table_name。 user Oracle用户名。 password Oracle用户名密码。 resource Oracle驱动包的OBS路径。 例如:obs://rest-authinfo/tools/oracle/driver/ojdbc6.jar resource中定义的driver jar包如果被更新,需要重启队列,才会生效。
  • 示例 创建Oracle跨源表 1 2 3 4 5 6 7 8 9 CREATE TABLE IF NOT EXISTS oracleTest USING ORACLE OPTIONS ( 'url'='jdbc:oracle:thin:@//192.168.168.40:1521/helowin', 'driver'='oracle.jdbc.driver.OracleDriver', 'dbtable'='test.Student', 'user' = 'test', 'password' = 'test', 'resource' = 'obs://rest-authinfo/tools/oracle/driver/ojdbc6.jar' );
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE TABLE [IF NOT EXISTS] TABLE_NAME USING ORACLE OPTIONS ( 'url'='xx', 'driver'='DRIVER_NAME', 'dbtable'='db_in_oracle.table_in_oracle', 'user' = 'xxx', 'password' = 'xxx', 'resource' = 'obs://rest-authinfo/tools/oracle/driver/ojdbc6.jar' );
  • 注意事项 DLI表必须已经存在。 DLI表在创建时需要指定Schema信息。 如果在建表时指定“key.column”,则在Redis中会以指定字段的值作为Redis Key名称的一部分。例如: 1 2 3 4 5 6 7 8 create table test_redis(name string, age int) using redis options( 'host' = '192.168.4.199', 'port' = '6379', 'passwdauth' = '******', 'table' = 'test_with_key_column', 'key.column' = 'name' ); insert into test_redis values("James", 35), ("Michael", 22); 在redis中将会有2个名为test_with_key_column:James和test_with_key_column:Michael的表: 如果在建表时没有指定“key.column”,则在Redis中的key name将会使用uuid。例如: 1 2 3 4 5 6 7 create table test_redis(name string, age int) using redis options( 'host' = '192.168.7.238', 'port' = '6379', 'passwdauth' = '******', 'table' = 'test_without_key_column' ); insert into test_redis values("James", 35), ("Michael", 22); 在redis中将会有2个以“test_without_key_column:uuid”命名的表:
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...];
共100000条