华为云用户手册

  • IS_DIGITS 功能描述 判断字符串是否只包含数字。 语法 BOOLEAN IS_DIGITS(VARCHAR content) 参数说明 content:输入字符串。 示例 测试语句 SELECT IS_DIGITS(content) AS case_result FROM T1; 测试数据和结果 表6 测试数据和结果 测试数据(content) 测试结果(case_result) 78 true 78.0 false 78a false null false "" (空字符串) false
  • IS_URL 功能描述 判断字符串是否是合法的URL地址。 语法 BOOLEAN IS_URL(VARCHAR content) 参数说明 content:输入字符串。 示例 测试语句 SELECT IS_URL(content) AS case_result FROM T1; 测试数据和结果 表8 测试数据和结果 测试数据(content) 测试结果(case_result) https://www.testweb.com true https://www.testweb.com:443 true www.testweb.com:443 false null false "" (空字符串) false
  • KEY_VALUE 功能描述 获取键值对字符串中某一个key对应的值。 语法 VARCHAR KEY_VALUE(VARCHAR content, VARCHAR split1, VARCHAR split2, VARCHAR key_name) 参数说明 content:输入字符串。 split1:多个键值对分隔符。 split2:key/value分隔符。 key_name:要获取的键名称。 示例 测试语句 SELECT KEY_VALUE(content, split1, split2, key_name) AS case_result FROM T1; 测试数据和结果 表10 测试数据和结果 测试数据(content, split1, split2, key_name) 测试结果(case_result) k1=v1;k2=v2 ; = k1 v1 null ; = k1 null k1=v1;k2=v2 null = k1 null
  • LPAD 功能描述 将pad字符串拼接到str字符串的的左端,直到新的字符串达到指定长度len为止。 语法 VARCHAR LPAD(VARCHAR str, INT len, VARCHAR pad) 参数说明 str:拼接前的字符串。 len:拼接后的字符串的长度。 pad:被拼接的字符串。 任意参数为null时返回null。 len为负数时返回为null。 len不大于str长度时,返回str裁剪为len长度的字符串。 示例 测试语句 SELECT LPAD("adc", 2, "hello"), LPAD("adc", -1, "hello"), LPAD("adc", 10, "hello"); 测试结果 "ad",,"helloheadc"
  • OVERLAY 功能描述 用y替换x的子串。从start_position开始,替换length+1个字符。 语法 VARCHAR OVERLAY ( (VARCHAR x PLACING VARCHAR y FROM INT start_position [ FOR INT length ]) ) 参数说明 x:字符串。 y:字符串。 start_position:起始位置。 length(可选):字符长度。 示例 测试语句: OVERLAY('abcdefg' PLACING 'xyz' FROM 2 FOR 2) AS result FROM T1; 测试结果: 表12 测试结果 result axyzdefg
  • CONCAT 功能描述 拼接两个或多个字符串值从而组成一个新的字符串。如果任一参数为NULL时,则跳过该参数。 语法 VARCHAR CONCAT(VARCHAR var1, VARCHAR var2, ...) 参数说明 var1:字符串 var2:字符串 示例 测试语句 SELECT CONCAT("abc", "def", "ghi", "jkl"); 测试结果 "abcdefghijkl"
  • 常用聚合函数 表1 常用聚合函数表 函数 返回值类型 描述 COUNT(*) BIGINT 返回元组个数。 COUNT([ ALL ] expression... BIGINT 返回表达式不为NULL的输入行数。对每个值的一个唯一实例使用DISTINCT。 AVG(numeric) DOUBLE 返回所有输入值的数字的平均值(算术平均值)。 SUM(numeric) DOUBLE 返回所有输入值之间的数值之和。 MAX(value) DOUBLE 返回所有输入值的值的最大值。 MIN(value) DOUBLE 返回所有输入值的值的最小值。 STDDEV_POP(value) DOUBLE 返回所有输入值之间的数字字段的总体标准偏差。 STDDEV_SAMP(value) DOUBLE 返回所有输入值之间的数字字段的样本标准偏差。 VAR_POP(value) DOUBLE 返回所有输入值之间的数字字段的总体方差(总体标准偏差的平方)。 VAR_SAMP(value) DOUBLE 返回所有输入值之间的数字字段的样本方差(样本标准偏差的平方)。
  • 示例 COUNT(*) 测试语句: SELECT COUNT(score) FROM T1; 测试数据和结果 表2 T1 测试数据(score) 测试结果 81 5 100 60 95 86 COUNT([ ALL ] expression | DISTINCT expression1 [, expression2]*) 测试语句: SELECT COUNT(DISTINCT content ) FROM T1; 测试数据和结果 表3 T1 content (STRING) 测试结果 "hello1 " 2 "hello2 " "hello2" null 86 AVG(numeric) 测试语句: SELECT AVG(score) FROM T1; 测试数据和结果 表4 T1 测试数据(score) 测试结果 81 84.0 100 60 95 86 SUM(numeric) 测试语句: SELECT SUM(score) FROM T1; 测试数据和结果 表5 T1 测试数据(score) 测试结果 81 422.0 100 60 95 86 MAX(value) 测试语句: SELECT MAX(score) FROM T1; 测试数据和结果 表6 T1 测试数据(score) 测试结果 81 100.0 100 60 95 86 MIN(value) 测试语句: SELECT MIN(score) FROM T1; 测试数据和结果 表7 T1 测试数据(score) 测试结果 81 60.0 100 60 95 86 STDDEV_POP(value) 测试语句: SELECT STDDEV_POP(score) FROM T1; 测试数据和结果 表8 T1 测试数据(score) 测试结果 81 13.0 100 60 95 86 STDDEV_SAMP(value) 测试语句: SELECT STDDEV_SAMP(score) FROM T1; 测试数据和结果 表9 T1 测试数据(score) 测试结果 81 15.0 100 60 95 86 VAR_POP(value) 测试语句: SELECT VAR_POP(score) FROM T1; 测试数据和结果 表10 T1 测试数据(score) 测试结果 81 193.0 100 60 95 86 VAR_SAMP(value) 测试语句: SELECT VAR_SAMP(score) FROM T1; 测试数据和结果 表11 T1 测试数据(score) 测试结果 81 241.0 100 60 95 86
  • 示例 create table jdbcSource ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT)with ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxxxxx');
  • 语法说明 string_split(target, separator) 表1 string_split参数说明 参数 数据类型 说明 target STRING 待处理的目标字符串。 说明: 如果target为NULL,则返回一个空行。 如果target包含两个或多个连续出现的分隔符时,则返回长度为零的空子字符串。 如果target未包含指定分隔符,则返回目标字符串。 separator VARCHAR 指定的分隔符,当前仅支持单字符分割。
  • 示例 准备测试输入数据 表2 测试源表disSource数据和分隔符 target(STRING) separator (VARCHAR) test-flink - flink - one-two-ww-three - 输入测试SQL语句 create table disSource( target STRING, separator VARCHAR) with ( "connector.type" = "dis", "connector.region" = "xxx", "connector.channel" = "ygj-dis-in", "format.type" = 'csv');create table disSink( target STRING, item STRING) with ( 'connector.type' = 'dis', 'connector.region' = 'xxx', 'connector.channel' = 'ygj-dis-out', 'format.type' = 'csv');insert into disSinkselect target, itemfrom disSource,lateral table(string_split(target, separator)) as T(item); 查看测试结果 表3 disSink结果表数据 target(STRING) item(STRING) test-flink test test-flink flink flink flink one-two-ww-three one one-two-ww-three two one-two-ww-three ww one-two-ww-three three
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc connector.url 是 数据库的URL connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。
  • 语法格式 create table jbdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression))with ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '');
  • 示例 insert into temp SELECT Date '2015-10-11' FROM OrderA;//返回日期insert into temp1 SELECT Time '12:14:50' FROM OrderA;//返回时间insert into temp2 SELECT Timestamp '2015-10-11 12:14:50' FROM OrderA;//返回时间戳
  • 数组函数 表1 数组函数表 函数 返回值类型 描述 CARDINALITY(ARRAY) INT 返回数组的元素个数。 ELEMENT(ARRAY) - 使用单个元素返回数组的唯一元素。 如果数组为空,则返回null。如果数组有多个元素,则抛出异常。 示例: 返回数组的元素个数为3。 insert into temp select CARDINALITY(ARRAY[TRUE, TRUE, FALSE]) from source_stream; 返回'HELLO WORLD'。 insert into temp select ELEMENT(ARRAY['HELLO WORLD']) from source_stream;
  • 属性访问函数 表2 属性访问函数表 函数 返回值类型 描述 tableName.compositeType.field - 选择单个字段,通过名称访问Apache Flink复合类型(如Tuple,POJO等)的字段并返回其值。 tableName.compositeType.* - 选择所有字段,将Apache Flink复合类型(如Tuple,POJO等)和其所有直接子类型转换为简单表示,其中每个子类型都是单独的字段。
  • 创建表相关语法 表1 创建表相关语法 语法分类 功能描述 创建源表 DataGen源表 DWS源表 Hbase源表 JDBC源表 Kafka源表 MySQL CDC源表 Postgres CDC源表 Redis源表 Upsert Kafka源表 创建结果表 BlackHole结果表 ClickHouse结果表 DWS结果表 Elasticsearch结果表 Hbase结果表 JDBC结果表 Kafka结果表 Print结果表 Redis结果表 Upsert Kafka结果表 创建维表 DWS维表 Hbase维表 JDBC维表 Redis维表 Format Avro Canal Confluent Avro CS V Debezium JSON Maxwell Raw
  • 功能描述 BlackHole Connector允许接收所有输入记录,常用于高性能测试和UDF 输出,其不是实质性Sink。Blackhole结果表是系统内置的Connector。 例如,如果您在注册其他类型的Connector结果表时报错,但您不确定是系统问题还是结果表WITH参数错误,您可以将WITH参数修改为'connector' = 'blackhole'后,单击运行。如果不再报错,则证明系统没有问题,您需要排查确认修改WITH参数是否正确。
  • 示例 通过DataGen源表产生数据,BlackHole结果表接收传来的数据。 create table datagenSource ( user_id string, user_name string, user_age int) with ( 'connector' = 'datagen', 'rows-per-second'='1');create table blackholeSink ( user_id string, user_name string, user_age int) with ( 'connector' = 'blackhole');insert into blackholeSink select * from datagenSource;
  • 数据类型映射 目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。 除了下面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。 您可以参考 Avro 规范 获取更多有关 Avro 类型的信息。 表2 数据类型映射 Flink SQL类型 Avro类型 Avro逻辑类型 CHAR / VARCHAR / STRING string - BOOLEAN boolean - BINARY / VARBINARY bytes - DECIMAL fixed decimal TINYINT int - SMALLINT int - INT int - BIGINT long - FLOAT float - DOUBLE double - DATE int date TIME int time-millis TIMESTAMP long timestamp-millis ARRAY array - MAP(key 必须是 string/char/varchar 类型) map - MULTISET(元素必须是 string/char/varchar 类型) map - ROW record -
  • 功能描述 Canal是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf序列化消息(Canal 默认使用 protobuf)。 Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。 Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,此处应为 'canal-json'. canal-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 canal-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值是:'SQL'和'ISO-8601'。 选项 'SQL' 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。 选项 'ISO-8601' 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。 canal-json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有'FAIL', 'DROP'和 'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'canal-json.map-null-key.literal' 定义。 canal-json.map-null-key.literal 否 'null' String 当 'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 canal-json.database.include 否 (none) String 仅读取指定数据库的 changelog 记录(通过对比 Canal 记录中的 "database" 元数据字段)。 canal-json.table.include 否 (none) String 仅读取指定表的 changelog 记录(通过对比 Canal 记录中的 "table" 元数据字段)。
  • 功能描述 Avro Schema Registry (avro-confluent) 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。 当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。 当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.schema-registry.subject 参数来制定。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,这里应该是 'csv'。 csv.field-delimiter 否 , String 字段分隔符 (默认','),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。 你也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 'csv.field-delimiter' = '\u0001' 代表 0x01 字符。 csv.disable-quote-character 否 false Boolean 是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 'csv.quote-character' 不能设置。 csv.quote-character 否 ‘’ String 用于围住字段值的引号字符 (默认"). csv.allow-comments 否 false Boolean 是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。 csv.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 csv.array-element-delimiter 否 ; String 分隔数组和行元素的字符串(默认';'). csv.escape-character 否 (none) String 转义字符(默认关闭). csv.null-literal 否 (none) String 是否将 "null" 字符串转化为 null 值。
  • 功能描述 Raw format 允许读写原始(基于字节)值作为单个列。 注意: 这种格式将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。 Raw format 连接器是内置的。
  • 参数说明 表1 参数 是否必选 默认值 类型 描述 format 是 (none) String 指定要使用的格式, 这里应该是 'raw'。 raw.charset 否 UTF-8 String 指定字符集来编码文本字符串。 raw.endianness 否 big-endian String 指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅字节序。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定使用格式,这是应该使用'maxwell-json'。 maxwell-json.ignore-parse-errors 否 false Boolean 跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。 maxwell-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”:选项“SQL”将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如“2020-12-30 12” :13:14.123' 并以相同格式输出时间戳。选项'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12: 13:14.123' 并以相同格式输出时间戳。 maxwell-json.map-null-key.mode 否 'FAIL' String 在序列化地图数据的空键时指定处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”:选项“FAIL”将在遇到带有空键的地图时抛出异常。选项“DROP”将删除地图数据的空键条目。选项“LITERAL”将替换空带字符串文字的键。字符串文字由 maxwell-json.map-null-key.literal 选项定义。 maxwell-json.map-null-key.literal 否 'null' String 当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串文字以替换空键。
  • 功能描述 Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON消息。 Flink 支持将 Debezium JSON解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。
  • 参数说明 表1 参数 是否必选 默认值 是否必选 描述 format 是 (none) String 指定要使用的格式,此处应为 'debezium-json'。 debezium-json.schema-include 否 false Boolean 设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。 debezium-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 debezium-json.timestamp-format.standard 否 'SQL' String 声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。 debezium-json.map-null-key.mode 否 'FAIL' String 指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'debezium-json.map-null-key.literal' 定义。 debezium-json.map-null-key.literal 否 'null' String 当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
  • 示例 根据order_id对数据进行去重,其中proctime为事件时间属性列 SELECT order_id, user, product, number FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num FROM Orders) WHERE row_num = 1;
共100000条