华为云用户手册

  • 示例 该示例是从kafka数据源中读取数据,并以insert模式写入DWS结果表中,其具体步骤如下: 参考增强型跨源连接,在 DLI 上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接DWS数据库,在DWS中创建相应的表,表名为dws_order,SQL语句参考如下: create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作业数据源,将DWS作为结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DWSAddress:DWSPort/DWSdbName', 'table-name' = 'dws_order', 'driver' = 'org.postgresql.Driver', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'write.mode' = 'insert' ); insert into dwsSink select * from kafkaSource; 连接Kafka集群,向Kafka中输入以下测试数据。 {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 从DWS中使用如下SQL语句查看数据结果。 select * from dws_order 数据结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100.0 100.0 2021-03-24 10:02:03 0001 Alice 330106
  • 语法格式 DWS结果表中不允许指定所有属性为PRIMARY KEY。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' );
  • 功能描述 DLI将Flink作业的输出数据输出到 数据仓库 服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 前提条件 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 注意事项 若需要使用upsert模式,则必须在DWS结果表和该结果表连接的DWS表都定义主键。 若DWS在不同的schema中存在相同名称的表,则在flink opensource sql中需要指定相应的schema。 提交Flink作业前,建议勾选“保存作业日志”参数,在OBS桶选项中选择日志保存的位置,方便后续作业提交失败或运行异常时,查看日志并分析问题原因。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 例如,使用gsjdbc4驱动连接、upsert模式写入数据到DWS中。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDatabase', 'table-name' = 'car_info', 'username' = 'DwsUserName', 'password' = 'DwsPasswrod', 'write.mode' = 'upsert' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例创建DWS结果表。 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector' = 'gaussdb', 'table-name' = 'ads_game_sdk_base\".\"test', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDatabase', 'username' = 'DwsUserName', 'password' = 'DwsPasswrod', 'write.mode' = 'upsert' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,这里是'gaussdb' url 是 无 String jdbc连接地址 。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考常见问题说明。 driver 否 org.postgresql.Driver String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String DWS数据库认证用户名,需要和'password'一起配置 password 否 无 String DWS数据库认证密码,需要和'username'一起配置 write.mode 否 无 String 数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。 该参数与'primary key'配合使用。 未配置'primary key'时,支持copy及insert两种模式追加写入。 配置'primary key',支持copy、upsert以及insert三种模式更新写入。 注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。 sink.buffer-flush.max-rows 否 100 Integer 每次写入请求缓存的最大行数。 它能提升写入数据的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 sink.buffer-flush.interval 否 1s Duration 刷新缓存的间隔,在这段时间内以异步线程刷新数据。 它能提升写入数据库的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",并设置刷新缓存的间隔,则以完整的异步处理方式刷新缓存。 格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 sink.max-retries 否 3 Integer 写入最大重试次数。 write.escape-string-value 否 false Boolean 是否对string类型值进行转义。该参数仅用于write.mode为copy模式下。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置置账号和密码。 key-by-before-sink 否 false Boolean 在sink算子前是否按指定的主键进行分区。 该参数旨在解决多并发写入的场景下且write.mode为upsert时,如果多个子任务中写入sink的一批数据具有不止一条相同的主键,并且主键相同的这些数据先后顺序不一致,就会导致两个子任务在向DWS根据主键获取行锁时发生互锁的问题。
  • 功能描述 DLI将边缘作业分析处理过的数据,写入到EdgeHub中,便于后续进行处理。 适用于物联网IOT场景,将实时流计算能力从云端延伸到边缘,在边缘快速实现对流数据实时、快速、准确地分析处理,增加数据处理计算的速度和效率。同时将数据在边缘预处理,可以有效减少无效的数据上云,减少资源消耗,提升分析效率。边缘作业依赖于智能边缘平台(Intelligent EdgeFabric, IEF),IEF通过纳管用户的边缘节点,提供将云上应用延伸到边缘的能力,联动边缘和云端的数据,同时,在云端提供统一的设备/应用监控、日志采集等运维能力,为企业提供完整的边缘计算解决方案。IEF的更多信息,请参见《智能边缘平台用户指南》。 仅Flink 1.7版本适配边缘作业场景,且Flink 1.7 EOS。DLI后续版本不再提供边缘作业场景的语法参考。
  • 示例 将数据以csv格式写入到edgeHub主题abcd中。 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM excellent_students( name string, score int) WITH ( type = "edgehub", topic = "abcd", encode = "csv", field_delimiter = "," );
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "edgehub", topic = "", encode = "", json_config = "", ENABLE_OUTPUT_NULL = "", field_delimiter = '' );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“edgehub”表示数据源为智能边缘平台的edgehub。 topic 是 主题,需要消费数据的edgehub中的主题名称。 encode 是 数据编码格式,可选为“csv”和“json”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 field_delimiter 否 属性分隔符。当“encode”为“csv”时,用于指定csv字段分隔符,默认为“,"。 当“encode”为“json”时,不需要设置属性之间的分隔符。 json_config 否 当“encode”为“json”时,可以通过该参数指定json字段和流定义字段的映射关系,格式为: "field1=data_json.field1;field2=data_json.field2;field3=$" 其中"field3=$"表示field3的内容为整个json串。 enable_output_null 否 当“encode”为“json”时,可以使用该参数来配置是否输出空字段。 “true”表示输出空字段(值为null)。 “false”表示不输出空字段。
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table kafkaSink( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'format' = '' );
  • 前提条件 确保已创建kafka集群。 该场景作业需要运行在DLI的独享队列上,因此要与Kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 示例(适用于Kafka集群未开启SASL_SSL场景) 该示例是从Kafka的一个topic中读取数据,并使用Kafka结果表将数据写入到kafka的另一个topic中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', "format" = "json" ); insert into kafkaSink select * from kafkaSource; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"} 连接Kafka集群,在Kafka的sink topic读取数据,参考如下: {"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...];
  • 注意事项 DLI表必须已经存在。 在“创建表关联HBase”章节创建的表中,OPTIONS里的Cols指定的列族如果不存在,insert into执行时会报错。 如果插入的(rowkey, 列族, 列)已存在,则执行插入操作时,会覆盖hbase中相同的(rowkey, 列族, 列)。 不建议对同一张表并发插入数据,因为有一定概率发生并发冲突,导致插入失败。 不支持INSERT OVERWRITE语法。
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 示例 从CloudTable的HBase中读取对象为car_infos的表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "cloudtable", region = "xxx", cluster_id = "209ab1b6-de25-4c48-8e1e-29e09d02de28", table_name = "carinfo", table_columns = "rowKey,info:owner,info:age,car:speed,car:miles" );
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "cloudtable", region = "", cluster_id = "", table_name = "", table_columns = "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“CloudTable”表示数据源为 表格存储服务 。 region 是 表格存储 服务所在区域。 cluster_id 是 待读取数据表所属集群id。 如何查看CloudTable的集群id,请参见《表格存储服务用户指南》中“查看集群基本信息”章节。 table_name 是 待读取数据的表名,如需指定namespace,可表示为:namespace_name:table_name 。 table_columns 是 待读取的列,具体形式如:"rowKey,f1:c1,f1:c2,f2:c1",并且保证与source相同的列数。
  • 功能描述 创建source流从表格存储服务CloudTable的HBase中获取数据,作为作业的输入数据。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式 云存储 系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。DLI可以从HBase中读取数据,用于过滤分析、数据转储等场景。 表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。
  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime(ROWS number PRECEDING) |(RANGE (BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW | UNBOUNDED preceding)) ) 语法说明 表3 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 同一select里所有聚合函数定义的窗口都必须保持一致。 当前Over窗口只支持前向计算(preceding),不支持following计算。 必须指定ORDER BY 按processing time或event time。 不支持对常量做聚合操作,如sum(2)。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • GROUP WINDOW 语法说明 Group Window定义在GROUP BY里,每个分组只输出一条记录,包括以下几种: time_attr可以设置processing-time或者event-time。 time_attr设置为event-time时参数类型为bigint或者timestamp类型。 time_attr设置为processing-time时无需指定类型。 interval设置窗口周期。 分组函数 表1 分组函数表 函数名 说明 TUMBLE(time_attr, interval) 跳跃窗口。 HOP(time_attr, interval, interval) 拓展的跳跃窗口(等价于datastream的滑动窗口),可以分别设置输出触发周期和窗口周期。 SESSION(time_attr, interval) 会话窗口,interval表示多长时间没有记录则关闭窗口。 窗口函数 表2 窗口函数表 函数名 说明 TUMBLE_START(time_attr, interval) 返回跳跃窗口开始时间。为UTC时区。 TUMBLE_END(time_attr, interval) 返回跳跃窗口结束时间。为UTC时区。 HOP_START(time_attr, interval, interval) 返回拓展的跳跃窗口开始时间。为UTC时区。 HOP_END(time_attr, interval, interval) 返回拓展的跳跃窗口结束时间。为UTC时区。 SESSION_START(time_attr, interval) 返回会话窗口开始时间。为UTC时区。 SESSION_END(time_attr, interval) 返回会话窗口结束时间。为UTC时区。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 // 每天计算SUM(金额)(事件时间)。 insert into temp SELECT name, TUMBLE_START(ts, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(ts, INTERVAL '1' DAY), name; // 每天计算SUM(金额)(处理时间)。 insert into temp SELECT name, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), name; // 每个小时计算事件时间中最近24小时的SUM(数量)。 insert into temp SELECT product, SUM(amount) FROM Orders GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '1' DAY), product; // 计算每个会话的SUM(数量),间隔12小时的不活动间隙(事件时间)。 insert into temp SELECT name, SESSION_START(ts, INTERVAL '12' HOUR) AS sStart, SESSION_END(ts, INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(ts, INTERVAL '12' HOUR), name;
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...];
  • 示例6:创建textfile格式的非分区表,并设置ROW FORMAT 示例说明:本例创建名为table4的textfile类型的非分区表,并设置ROW FORMAT相关格式(ROW FORMAT功能只支持textfile类型的表)。 字段(Fields)是表格中的列,每个字段有一个名称和数据类型,表中字段之间以'/'分隔。 集合项(COLLECTION ITEMS)指的是一组数据中的元素,可以是数组、列表或集合等,table4中集合项以'$'分隔。 映射键(MAP KEYS)是一种键值对的数据结构,用于存储一组相关联的数据,表中Map键以'#'分隔。 行(Rows)表格中的行,每一行包含一组字段值,表中行以'\n'结束(注意,只支持用'\n'作为行分隔符)。 NULL表示缺少值或未知值的特殊值。在表格中,NULL表示该字段没有值或该值未知。如果数据中存在null值,则用字符串“null”表示。 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE IF NOT EXISTS table4 ( col_1 STRING, col_2 INT ) STORED AS TEXTFILE ROW FORMAT DELIMITED FIELDS TERMINATED BY '/' COLLECTION ITEMS TERMINATED BY '$' MAP KEYS TERMINATED BY '#' LINES TERMINATED BY '\n' NULL DEFINED AS 'NULL';
  • 示例1:创建DLI非分区表 示例说明:创建名为table1的DLI非分区表,并用STORED AS关键字指定该表的存储格式为orc格式。 在您的实际使用中,可以将DLI表存储为textfile, avro, orc, sequencefile, rcfile, parquet等类型。 1 2 3 4 5 CREATE TABLE IF NOT EXISTS table1 ( col_1 STRING, col_2 INT ) STORED AS orc;
  • 示例5:创建DLI分区表,自定义表的TBLPROPERTIES参数 示例说明:本例创建名为table3并以col_3为分区依据的DLI分区表。在TBLPROPERTIES中配置dli.multi.version.enable、comment、orc.compress和auto.purge。 dli.multi.version.enable:本例配置为true,即代表开启DLI数据多版本功能,用于表数据的备份与恢复。 comment:表描述信息,TBLPROPERTIES内的描述信息支持后续修改。 orc.compress:指定orc存储的压缩方式,本例定义为ZLIB。 auto.purge:本例配置为true,即删除或者覆盖的数据会不经过回收站,直接被删除。 1 2 3 4 5 6 7 8 9 10 11 12 CREATE TABLE IF NOT EXISTs table3 ( col_1 STRING, col_2 STRING ) PARTITIONED BY (col_3 DATE) STORED AS rcfile TBLPROPERTIES ( dli.multi.version.enable = true, comment = 'Created by dli', orc.compress = 'ZLIB', auto.purge = true );
共100000条