华为云用户手册

  • 功能描述 Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。 Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如: 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史 Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。 更多具体使用可参考开源社区文档:Debezium Format。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,这里应该是 'csv'。 csv.field-delimiter 否 , String 字段分隔符 (默认','),必须为单字符。您可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。 您也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如'csv.field-delimiter' = U&'\0001' 代表 0x01 字符。 csv.disable-quote-character 否 false Boolean 是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 'csv.quote-character' 不能设置。 csv.quote-character 否 ‘’ String 用于围住字段值的引号字符 (默认"). csv.allow-comments 否 false Boolean 是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。 csv.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 csv.array-element-delimiter 否 ; String 分隔数组和行元素的字符串(默认';'). csv.escape-character 否 (none) String 转义字符(默认关闭). csv.null-literal 否 (none) String 是否将 "null" 字符串转化为 null 值。
  • 数据类型映射 目前 CS V 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。 表2 数据类型映射 Flink SQL 类型 CSV 类型 CHAR / VARCHAR / STRING string BOOLEAN boolean BINARY / VARBINARY string with encoding: base64 DECIMAL number TINYINT number SMALLINT number INT number BIGINT number FLOAT number DOUBLE number DATE string with format: date TIME string with format: time TIMESTAMP string with format: date-time INTERVAL number ARRAY array ROW object
  • 功能描述 Avro Schema Registry (avro-confluent) 格式能让您读取被 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化的记录。 当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。 当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.subject 参数来指定。
  • 数据类型映射 目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 Avro Format中描述了 Flink 数据类型和 Avro 类型的对应关系。 除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro union(something, null), 其中 something 是从 Flink 类型转换的 Avro 类型。
  • 功能描述 Canal是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf序列化消息(Canal 默认使用 protobuf)。 Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。 Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。 更多具体使用可参考开源社区文档:Canal Format。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,此处应为 'canal-json'. canal-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 canal-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值是:'SQL'和'ISO-8601'。 选项 'SQL' 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。 选项 'ISO-8601' 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。 canal-json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有'FAIL', 'DROP'和 'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'canal-json.map-null-key.literal' 定义。 canal-json.map-null-key.literal 否 'null' String 当 'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 canal-json.encode.decimal-as-plain-number 否 false Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。 canal-json.database.include 否 (none) String 一个可选的正则表达式,通过正则匹配 Canal 记录中的 "database" 元字段,仅读取指定数据库的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。 canal-json.table.include 否 (none) String 一个可选的正则表达式,通过正则匹配 Canal 记录中的 "table" 元字段,仅读取指定表的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。
  • 数据类型映射 目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。 除了下面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。 表2 数据类型映射 Flink SQL类型 Avro类型 Avro逻辑类型 CHAR / VARCHAR / STRING string - BOOLEAN boolean - BINARY / VARBINARY bytes - DECIMAL fixed decimal TINYINT int - SMALLINT int - INT int - BIGINT long - FLOAT float - DOUBLE double - DATE int date TIME int time-millis TIMESTAMP long timestamp-millis ARRAY array - MAP(key 必须是 string/char/varchar 类型) map - MULTISET(元素必须是 string/char/varchar 类型) map - ROW record -
  • Format概述 Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。 表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。 表1 Flink支持格式 Formats 支持的Connectors CSV Kafka, Upsert Kafka, FileSystem JSON Kafka, Upsert Kafka, FileSystem, Elasticsearch Avro Kafka, Upsert Kafka, FileSystem Confluent Avro Kafka, Upsert Kafka Debezium Kafka, FileSystem Canal Kafka, FileSystem Maxwell Kafka, FileSystem Ogg Kafka, FileSystem Orc FileSystem Parquet FileSystem Raw Kafka, Upsert Kafka, FileSystem 父主题: Format
  • 创建表相关语法 表1 创建表相关语法 语法分类 功能描述 Format Avro Canal Confluent Avro CSV Debezium JSON Maxwell Ogg Orc Parquet Raw Connectors BlackHole ClickHouse DataGen Doris DWS Elasticsearch FileSystem Hbase Hive JDBC Kafka Print Redis Upsert Kafka
  • 语法定义 INSERT INTO table_name [PARTITION part_spec] query part_spec: (part_col_name1=val1 [, part_col_name2=val2, ...]) query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')' matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')' measureColumn: expression AS alias pattern: patternTerm [ '|' patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: '*' | '*?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}'
  • 约束限制 Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词法约定: 不管是否引用标识符,都保留标识符的大小写。 且标识符需区分大小写。 与 Java 不一样的地方在于,通过反引号,可以允许标识符带有非字母的字符(如:"SELECT a AS `my field` FROM t")。 字符串文本常量需要被单引号包起来(如 SELECT 'Hello World' )。两个单引号表示转义(如 SELECT 'It''s me.')。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法: 使用反斜杠(\)作为转义字符(默认):SELECT U&'\263A' 使用自定义的转义字符: SELECT U&'#263A' UESCAPE '#'
  • 功能描述 创建一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个 identifier ,可指定 language tag 。 若catalog 中,已经有同名的函数注册了,则无法注册。如果 language tag 是 JAVA 或者 SCALA ,则 identifier 是 UDF 实现类的全限定名。 如果您需要了解创建自定义函数的步骤请参考自定义函数。
  • 语法说明 TEMPORARY 创建一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。 TEMPORARY SYSTEM 创建一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。 IF NOT EXISTS 如果该函数已经存在,则不会进行任何操作。 LANGUAGE JAVA|SCALA Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA, SCALA,且函数的默认语言为 JAVA。
  • 语法说明 COMPUTED COLUMN 计算列是一个使用 “column_name AS computed_column_expression” 语法生成的虚拟列。它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以使用 cost AS price * quantity 进行定义,这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。 在 Flink 中计算列一般用于为 CREATE TABLE 语句定义 时间属性。 处理时间属性 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义。 另一方面,由于事件时间列可能需要从现有的字段中获得,因此计算列可用于获得事件时间列。例如,原始字段的类型不是 TIMESTAMP(3) 或嵌套在 JSON 字符串中。 注意: 定义在一个数据源表( source table )上的计算列会在从数据源读取数据后被计算,它们可以在 SELECT 查询语句中使用。 计算列不可以作为 INSERT 语句的目标,在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致。
  • 保留关键字 一些字符串的组合已经被预留为关键字以备未来使用。 如果使用以下字符串作为字段名,请在使用时使用反引号将该字段名包起来,例如 `value`, `count` 。 A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATA LOG , CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERIS TICS , CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMNS, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MODULES, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PA RAM ETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, RAW, READ, READS, REAL, RECURSIVE, REF, REFEREN CES , REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFO RMS , TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, W IDT H_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE 父主题: SQL语法约束与定义
  • 语法支持类型 DLI 支持以下数据类型: CHAR, VARCHAR, STRING, BOOLEAN, BINARY, VARBINARY, BYTES, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, INTERVAL, ARRAY, MULTISET, MAP, ROW, RAW 在SQL语法中这些类型用于定义表中列的数据类型。 父主题: SQL语法约束与定义
  • 操作场景 如果您的自定义函数需要在多个作业中使用,但对于不同作业某些参数值不同,直接在UDF中修改较为复杂。您可以在Flink OpenSource SQL编辑页面,自定义配置中配置参数pipeline.global-job-parameters,在UDF代码中获取该参数并使用。如需修改参数值,直接在FlinkOpenSource SQL编辑页面,自定义配置中修改该参数值,即可达到快速修改UDF参数值的目的。
  • 操作步骤 自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下: 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters,格式如下: pipeline.global-job-parameters=k1:v1,"k2:v1,v2",k3:"str:ing","k4:str""ing" 该配置定义了如表1的map。 表1 pipeline.global-job-parameters示例 key value k1 v1 k2 v1,v2 k3 str:ing k4 str""ing FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。 key和value之间通过冒号(:)分隔,所有key-value用逗号(,)连接。 如果key或value中含有逗号(,),则需要用双引号(")将key:value整个包围起来。参考k2。 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。参考k3。 如果key或value中含有双引号("),则需要通过连写两个双引号("")进行转义,也需要用双引号(")将key:value整个包围起来。参考k4。 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容,代码示例如下: context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table"); context.getJobParameter("driver","com.mysql.jdbc.Driver"); context.getJobParameter("user","user"); context.getJobParameter("password","password");
  • 操作场景 类型推导包含了验证输入值、派生参数和返回值数据类型。从逻辑角度看,Planner需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用自定义函数时需要知道如何将内部数据结构表示为JVM对象。 Flink 自定义函数实现了自动的类型推导提取,通过反射从函数的类及其求值方法中派生数据类型。然而以反射方式提取数据类型并不总是成功的,比如UDTF中常见的Row类型。 由于 Flink 1.11 起引入了新的自定义函数注册接口,使用了新的自定义函数类型推断机制,因此原先1.10 重载 getResultType 声明返回字段类型的方式将不再可用。继续使用会抛出如下异常: Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types. 目前 Flink 1.12 可以通过使用DataTypeHint 和FunctionHint 注解相关参数、类或方法来支持提取过程。
  • 使用示例 UDTF支持CROSS JOIN和LEFT JOIN,在使用UDTF时需要带上 LATERAL 和TABLE 两个关键字。 CROSS JOIN:对于左表的每一行数据,假设UDTF不产生输出,则这一行不进行输出。 LEFT JOIN:对于左表的每一行数据,假设UDTF不产生输出,这一行仍会输出,UDTF相关字段用null填充。 CREATE FUNCTION udtf_test AS 'com.huaweicompany.udf.TableFunction';-- CROSS JOIN INSERT INTO sink_stream select subValue, length FROM source_stream, LATERAL TABLE(udtf_test(attr, ',')) as T(subValue, length);-- LEFT JOIN INSERT INTO sink_stream select subValue, length FROM source_stream LEFT JOIN LATERAL TABLE(udtf_test(attr, ',')) as T(subValue, length) ON TRUE;
  • 功能描述 FileSystem sink用于将数据输出到分布式文件系统HDFS或者 对象存储服务 OBS等文件系统。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的Part文件。完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。即桶中将包含一个小时间隔内接收到的记录。 桶目录中的数据被拆分成多个Part文件。对于相应的接收数据的桶的Sink的每个Subtask,每个桶将至少包含一个Part文件。将根据配置的滚动策略来创建其他Part文件。对于Row Formats默认的策略是根据Part文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。对于Bulk Formats在每次创建Checkpoint时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。 在STREAMING模式下使用FileSink需要开启Checkpoint功能。Part文件只在Checkpoint成功时生成。如果没有开启Checkpoint功能,文件将永远停留在in-progress或者pending的状态,并且下游系统将不能安全读取该文件数据。 sink end算子的接受记录数为checkpoint的个数,非实际的发送数据,实际发送数据量请参考streaming-writer或StreamingFileWriter算子的记录数。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE sink_table ( name string, num INT, p_day string, p_hour string ) partitioned by (p_day, p_hour) WITH ( 'connector' = 'filesystem', 'path' = 'obs://*** ', 'format' = 'parquet', 'auto-compaction' = 'true' );
  • 示例二 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到checkpoint间隔或达到100MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min', 'auto-compaction'='true', 'compaction.file-size'='100m' ); INSERT into sink_table SELECT * from orders;
  • 示例一 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint无关,达到30min或128MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min' ); INSERT into sink_table SELECT * from orders;
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 固定位filesystem。 path 是 无 String OBS路径。 format 是 无 String 文件格式。 支持csv、parquet格式。 sink.rolling-policy.file-size 否 128MB MemorySize 单个part文件最大大小,超过该数值会滚动产生新文件。 说明: RollingPolicy 定义了何时关闭给定的In-progress Part文件,并将其转换为Pending状态,然后再转换为Finished状态。 Finished状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在STREAMING模式下,滚动策略结合Checkpoint间隔(到下一个Checkpoint成功时,文件的Pending状态才转换为Finished状态)共同控制Part文件对下游readers是否可见以及这些文件的大小和数量。 sink.rolling-policy.rollover-interval 否 30 min Duration 单个Part文件处于打开状态的最长时间,超过该时间会滚动产生新文件(默认值30分钟,以避免产生大量小文件)。检查频率是通过sink.rolling-policy.check-interval参数控制的。 说明: 该参数数字与单位之间必须要有空格。 支持的时间单位包括: d,h,min,s,ms等。 对于bulk格式的文件(parquet、orc、avro),checkpoint的时间间隔也会控制单个part文件打开的最长时间。 sink.rolling-policy.check-interval 否 1 min Duration 基于时间的滚动策略的检查间隔。 该属性控制了基于sink.rolling-policy.rollover-interval属性检查文件是否该被滚动的检查频率。 auto-compaction 否 false Boolean 在流式 sink 中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该checkpoint产生的临时文件会被合并。 compaction.file-size 否 `sink.rolling-policy.file-size`的大小 MemorySize 合并目标文件大小,默认值为滚动文件大小。 说明: 只有在同个checkpoint内的文件会被合并,因此最终文件的数量至少等于checkpoint的数量。 如果合并时间较长,可能会引起反压,延长checkpoint所需时间。 开启该功能后,checkpoint时会产生最终文件,并打开新的文件接收下个checkpoint产生的数据。
  • 语法说明 string_split(target, separator) 表1 string_split参数说明 参数 数据类型 说明 target STRING 待处理的目标字符串。 说明: 如果target为NULL,则返回一个空行。 如果target包含两个或多个连续出现的分隔符时,则返回长度为零的空子字符串。 如果target未包含指定分隔符,则返回目标字符串。 separator VARCHAR 指定的分隔符,当前仅支持单字符分隔。
  • 聚合函数 聚合函数是从一组输入值计算一个结果。例如使用COUNT函数计算SQL查询语句返回的记录行数。聚合函数如表1所示。 表1 聚合函数表 函数 返回值类型 描述 COUNT([ ALL ] expression | DISTINCT expression1 [, expression2]*) BIGINT 返回表达式不为NULL的输入行数。对每个值的一个唯一实例使用DISTINCT。 COUNT(*) COUNT(1) BIGINT 返回元组个数 AVG([ ALL | DISTINCT ] expression) DOUBLE 返回所有值的平均值。 对每个值的一个唯一实例使用DISTINCT。 SUM([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的数值之和 对每个值的一个唯一实例使用DISTINCT MAX([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的最大值 MIN([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的最小值 STDDEV_POP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的总体标准偏差 STDDEV_SAMP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本标准偏差 VAR_POP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的总体方差 VAR_SAMP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本方差 COLLECT([ ALL | DISTINCT ] expression) MULTISET 返回所有输入值的MULTISET VARIANCE([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本方差 FIRST_VALUE(expression) 数据实际类型 返回有序数据中的第一个数据 LAST_VALUE(expression) 数据实际类型 返回有序数据中的最后一个数据 父主题: 内置函数
  • 函数说明 表1 Hash函数说明 Hash函数 函数说明 MD5(string) 返回以32个十六进制数所表示的字符串的MD5哈希值 若字符串是null,则返回null SHA1(string) 返回以40个十六进制所表示的字符串的SHA-1哈希值 若字符串是null,则返回null SHA224(string) 返回以56个十六进制数所表示的字符串的SHA-224哈希值 若字符串是null,则返回null SHA256(string) 返回以64个十六进制数所表示的字符串的SHA-256哈希值 若字符串是null,则返回null SHA384(string) 返回以96个十六进制数所表示的字符串的SHA-384哈希值 若字符串是null,则返回null SHA512(string) 返回以128个十六进制数所表示的字符串的SHA-512哈希值 若字符串是null,则返回null SHA2(string, hashLength) 返回使用SHA-2哈希函数族(SHA-224, SHA-256, SHA-384, or SHA-512)得到的哈希值 第一个参数string表示被哈希的字符串,第二个参数hashLength表示哈希值的长度(224、256、384、512) 若任意参数为null,则返回null
  • 函数说明 表1 属性访问函数说明 值接入函数 函数说明 tableName.compositeType.field 选择单个字段,通过名称访问Apache Flink复合类型(如Tuple,POJO等)的字段并返回其值。 tableName.compositeType.* 选择所有字段,将Apache Flink复合类型(如Tuple,POJO等)和其所有直接子类型转换为简单表示,其中每个子类型都是单独的字段。
共100000条