云服务器内容精选

  • Grouping 语法简介: 当Group by语句带with rollup/cube选项时,Grouping才有意义。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 ROLLUP生成的结果集显示了所选列中值的某一层次结构的聚合。 Grouping:当用CUBE或ROLLUP运算符添加行时,附加的列输出值为1;当所添加的行不是由CUBE或ROLLUP产生时,附加列值为0。 例如,Hive中有一张表“table_test”,表结构如下所示: +----------------+-------------------+--+ | table_test.id | table_test.value | +----------------+-------------------+--+ | 1 | 10 | | 1 | 15 | | 2 | 20 | | 2 | 5 | | 2 | 13 | +----------------+-------------------+--+ 执行如下语句: select id,grouping(id),sum(value) from table_test group by id with rollup; 得到如下结果: +-------+-----------------+------+--+ | id | groupingresult | sum | +-------+-----------------+------+--+ | 1 | 0 | 25 | | NULL | 1 | 63 | | 2 | 0 | 38 | +-------+-----------------+------+--+
  • EXCEPT、INTERSECT 语法简介 EXCEPT返回两个结果集的差(即从左查询中返回右查询没有找到的所有非重复值)。 INTERSECT返回两个结果集的交集(即两个查询都返回的所有非重复值)。 例如,Hive中有两张表“test_table1”、“test_table2”。 “test_table1”表结构如下所示: +-----------------+--+ | test_table1.id | +-----------------+--+ | 1 | | 2 | | 3 | | 4 | +-----------------+--+ “test_table2”表结构如下所示: +-----------------+--+ | test_table2.id | +-----------------+--+ | 2 | | 3 | | 4 | | 5 | +-----------------+--+ 执行如下的EXCEPT语句: select id from test_table1 except select id from test_table2; 显示如下结果: +--------------+--+ | _alias_0.id | +--------------+--+ | 1 | +--------------+--+ 执行INTERSECT语句: select id from test_table1 intersect select id from test_table2; 显示如下结果: +--------------+--+ | _alias_0.id | +--------------+--+ | 2 | | 3 | | 4 | +--------------+--+
  • Grouping 语法简介: 当group by语句带with rollup/cube选项时,Grouping才有意义。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 ROLLUP生成的结果集显示了所选列中值的某一层次结构的聚合。 Grouping:当用CUBE或ROLLUP运算符添加行时,附加的列输出值为1;当所添加的行不是由CUBE或ROLLUP产生时,附加列值为0。 例如,Hive中有一张表“table_test”,表结构如下所示: +----------------+-------------------+--+ | table_test.id | table_test.value | +----------------+-------------------+--+ | 1 | 10 | | 1 | 15 | | 2 | 20 | | 2 | 5 | | 2 | 13 | +----------------+-------------------+--+ 执行如下语句: select id,grouping(id),sum(value) from table_test group by id with rollup; 得到如下结果: +-------+-----------------+------+--+ | id | groupingresult | sum | +-------+-----------------+------+--+ | 1 | 0 | 25 | | NULL | 1 | 63 | | 2 | 0 | 38 | +-------+-----------------+------+--+
  • EXCEPT、INTERSECT EXCEPT返回两个结果集的差(即从左查询中返回右查询没有找到的所有非重复值)。 INTERSECT返回两个结果集的交集(即两个查询都返回的所有非重复值)。 例如,Hive中有两张表“test_table1”、“test_table2”。 “test_table1”表结构如下所示: +-----------------+--+ | test_table1.id | +-----------------+--+ | 1 | | 2 | | 3 | | 4 | +-----------------+--+ “test_table2”表结构如下所示: +-----------------+--+ | test_table2.id | +-----------------+--+ | 2 | | 3 | | 4 | | 5 | +-----------------+--+ 执行如下的EXCEPT语句: select id from test_table1 except select id from test_table2; 显示如下结果: +--------------+--+ | _alias_0.id | +--------------+--+ | 1 | +--------------+--+ 执行INTERSECT语句: select id from test_table1 intersect select id from test_table2; 显示如下结果: +--------------+--+ | _alias_0.id | +--------------+--+ | 2 | | 3 | | 4 | +--------------+--+
  • 基本语法 DROP [TEMPORARY] TABLE [IF EXISTS] [database_name.]name [ON CLUSTER cluster] [SYNC] 示例: 删除表t1。 drop table t1 SYNC; 在删除复制表时,因为复制表需要在Zookeeper上建立一个路径,存放相关数据。ClickHouse默认的库引擎是原子数据库引擎,删除Atomic数据库中的表后,它不会立即删除,而是会在24小时后删除。在删除表时,加上SYNC字段,即可解决该问题,例如:drop table t1 SYNC; 删除本地表和分布式表,则不会出现该问题,可不带SYNC字段,例如:drop table t1; 如果建表语句中包含了“ON CLUSTER ClickHouse集群名”,删除表命令: drop table 表名 ON CLUSTER default_cluster; 如果建表语句不包含“ON CLUSTER ClickHouse集群名”,删除表命令: drop table 表名; 删除数据表前,需确认此数据表是否应用中,以免引起不必要的麻烦。删除数据表后可在24小时内恢复,超过24小时无法恢复。恢复命令如下: set allow_experimental_undrop_table_query = 1; UNDROP TABLE 数据表名;
  • 约束 支持在Hudi客户端执行Spark SQL操作Hudi。 支持在Spark2x的JD BCS erver中执行Spark SQL操作Hudi。 不支持在Spark2x的客户端执行Spark SQL操作Hudi,支持在Spark3.1.1及之后版本的客户端执行Spark SQL操作Hudi。 不支持在Hive、Hetu引擎中写hudi表,以及修改hudi表结构,仅支持读。 由于SQL的KeyGenerator默认是org.apache.hudi.keygen.ComplexKeyGenerator,要求DataSource方式写入时KeyGenerator与SQL设置的一致。
  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime(ROWS number PRECEDING) |(RANGE (BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW | UNBOUNDED preceding)) ) 语法说明 表3 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 同一select里所有聚合函数定义的窗口都必须保持一致。 当前Over窗口只支持前向计算(preceding),不支持following计算。 必须指定ORDER BY 按processing time或event time。 不支持对常量做聚合操作,如sum(2)。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • GROUP WINDOW 语法说明 Group Window定义在GROUP BY里,每个分组只输出一条记录,包括以下几种: time_attr可以设置processing-time或者event-time。 time_attr设置为event-time时参数类型为bigint或者timestamp类型。 time_attr设置为processing-time时无需指定类型。 interval设置窗口周期。 分组函数 表1 分组函数表 函数名 说明 TUMBLE(time_attr, interval) 跳跃窗口。 HOP(time_attr, interval, interval) 拓展的跳跃窗口(等价于datastream的滑动窗口),可以分别设置输出触发周期和窗口周期。 SESSION(time_attr, interval) 会话窗口,interval表示多长时间没有记录则关闭窗口。 窗口函数 表2 窗口函数表 函数名 说明 TUMBLE_START(time_attr, interval) 返回跳跃窗口开始时间。为UTC时区。 TUMBLE_END(time_attr, interval) 返回跳跃窗口结束时间。为UTC时区。 HOP_START(time_attr, interval, interval) 返回拓展的跳跃窗口开始时间。为UTC时区。 HOP_END(time_attr, interval, interval) 返回拓展的跳跃窗口结束时间。为UTC时区。 SESSION_START(time_attr, interval) 返回会话窗口开始时间。为UTC时区。 SESSION_END(time_attr, interval) 返回会话窗口结束时间。为UTC时区。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 // 每天计算SUM(金额)(事件时间)。 insert into temp SELECT name, TUMBLE_START(ts, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(ts, INTERVAL '1' DAY), name; // 每天计算SUM(金额)(处理时间)。 insert into temp SELECT name, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), name; // 每个小时计算事件时间中最近24小时的SUM(数量)。 insert into temp SELECT product, SUM(amount) FROM Orders GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '1' DAY), product; // 计算每个会话的SUM(数量),间隔12小时的不活动间隙(事件时间)。 insert into temp SELECT name, SESSION_START(ts, INTERVAL '12' HOUR) AS sStart, SESSION_END(ts, INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(ts, INTERVAL '12' HOUR), name;
  • 批作业SQL常用配置项说明 本章节为您介绍 DLI 批作业SQL语法的常用配置项。 表1 常用配置项 名称 默认值 描述 spark.sql.files.maxRecordsPerFile 0 要写入单个文件的最大记录数。如果该值为零或为负,则没有限制。 spark.sql.shuffle.partitions 200 为连接或聚合过滤数据时使用的默认分区数。 spark.sql.dynamicPartitionOverwrite.enabled false 当前配置设置为“false”时,DLI在覆盖写之前,会删除所有符合条件的分区。例如,分区表中有一个“2021-01”的分区,当使用INSERT OVERWRITE语句向表中写入“2021-02”这个分区的数据时,会把“2021-01”的分区数据也覆盖掉。 当前配置设置为“true”时,DLI不会提前删除分区,而是在运行时覆盖那些有数据写入的分区。 spark.sql.files.maxPartitionBytes 134217728 读取文件时要打包到单个分区中的最大字节数。 spark.sql.badRecordsPath - Bad Records的路径。 spark.sql.legacy.correlated.scalar.query.enabled false 该参数设置为true: 当子查询中数据不重复的情况下,执行关联子查询,不需要对子查询的结果去重。 当子查询中数据重复的情况下,执行关联子查询,会提示异常,必须对子查询的结果做去重处理,比如max(),min()。 该参数设置为false: 不管子查询中数据重复与否,执行关联子查询时,都需要对子查询的结果去重,比如max(),min(),否则提示异常。 spark.sql.keep.distinct.expandThreshold - 参数说明: 对于包含count(distinct)的多维分析(with cube)的查询场景,spark典型的执行计划是将cube使用expand算子来实现,但该操作会导致查询膨胀,为了避免出现查询膨胀,建议执行如下配置: spark.sql.keep.distinct.expandThreshold: 默认值:-1,即使用Spark默认的expand算子。 设置具体数值:即代表定义了查询膨胀的阈值(例如512),超过该阈值count(distinct) 使用distinct聚合算子来执行,不再使用expand算子。 spark.sql.distinct.aggregator.enabled:强制使用distinct聚合算子的开关。配置为true时不再根据spark.sql.keep.distinct.expandThreshold来判断。 适用场景:包含count(distinct)的多维分析(with cube)的查询场景,可能包含多个count(distinct),且包含cube/roll up 典型场景示例: SELECT a1, a2, count(distinct b), count(distinct c) FROM test_distinct group by a1, a2 with cube spark.sql.distinct.aggregator.enabled false spark.sql.optimizer.dynamicPartitionPruning.enabled true 该配置项用于启用或禁用动态分区修剪。在执行SQL查询时,动态分区修剪可以帮助减少需要扫描的数据量,提高查询性能。 配置为true时,代表启用动态分区修剪,SQL会在查询中自动检测并删除那些不满足WHERE子句条件的分区,适用于在处理具有大量分区的表时。 如果SQL查询中包含大量的嵌套left join操作,并且表有大量的动态分区时,这可能会导致在数据解析时消耗大量的内存资源,导致Driver节点的内存不足,并触发频繁的Full GC。 在这种情况下,可以配置该参数为false即禁用动态分区修剪优化,有助于减少内存使用,避免内存溢出和频繁的Full GC。 但禁用此优化可能会降低查询性能,禁用后Spark将不会自动修剪掉那些不满足条件的分区。 父主题: Spark SQL语法参考(即将下线)
  • 配置Event Time Event Time是指事件产生的时间,即数据产生时自带时间戳。 语法格式 1 2 3 CREATE SOURCE STREAM stream_name(...) WITH (...) TIMESTAMP BY {attr_name}.rowtime SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval}); 语法说明 设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。 由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到DLI服务,中间有一个过程。 Watermark有两种设置策略: 按时间周期 1 SET WATERMARK(range interval {time_unit}, interval {time_unit}) 按事件个数 1 SET WATERMARK(rows literal, interval {time_unit}) 一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。 注意事项 无。 示例 time2事件产生时间开始,每10s发送一次watermark,事件最大允许延迟时间为20s。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (RANGE interval 10 second, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores; 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (ROWS 10, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;
  • 配置Processing Time Processing Time是指系统时间,与数据本身的时间戳无关,即在Flink算子内计算完成的时间。 语法格式 1 2 3 4 CREATE SOURCE STREAM stream_name(...) WITH (...) TIMESTAMP BY proctime.proctime; CREATE TEMP STREAM stream_name(...) TIMESTAMP BY proctime.proctime; 语法说明 设置Processing Time只需在timestamp by后配置proctime.proctime即可,后续可以直接使用proctime字段。 注意事项 无。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT /* 成绩 */ ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," )TIMESTAMP BY proctime.proctime; INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by proctime RANGE UNBOUNDED PRECEDING) FROM student_scores;