华为云用户手册

  • 功能描述 创建一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个 identifier ,可指定 language tag 。 若catalog 中,已经有同名的函数注册了,则无法注册。如果 language tag 是 JAVA 或者 SCALA ,则 identifier 是 UDF 实现类的全限定名。 如果您需要了解创建自定义函数的步骤请参考自定义函数。
  • 语法说明 TEMPORARY 创建一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。 TEMPORARY SYSTEM 创建一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。 IF NOT EXISTS 如果该函数已经存在,则不会进行任何操作。 LANGUAGE JAVA|SCALA Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA, SCALA,且函数的默认语言为 JAVA。
  • 算术函数 表1 算术函数 运算符 描述 + numeric 返回 numeric。 - numeric 返回 numeric 的相反数。 numeric1 + numeric2 返回 numeric1 加 numeric2 numeric1 - numeric2 返回 numeric1 减 numeric2。 numeric1 * numberic2 返回 numeric1 乘以 numeric2。 numeric1 / numeric2 返回 numeric1 除以 numeric2。 numeric1 % numeric2 返回 numeric1 除以 numeric2 的余数(模数)。仅当 numeric1 为负时,结果才为负。 POWER(numeric1, numeric2) 返回 numeric1 的 numeric2 次方。 ABS(numeric) 返回 numeric 的绝对值。 SQRT(numeric) 返回 numeric 的平方根。 LN(numeric) 返回 numeric 的自然对数(以 e 为底)。 LOG 10(numeric) 返回以 10 为底的 numeric 的对数。 LOG2(numeric) 返回以 2 为底的 numeric 的对数。 LOG(numeric2) LOG(numeric1, numeric2) 当用一个参数调用时,返回 numeric2 的自然对数。当使用两个参数调用时,此函数返回 numeric2 以 numeric1 为底的对数。numeric2 必须大于 0,numeric1 必须大于 1。 EXP(numeric) 返回 e 的 numeric 次幂。 CEIL(numeric) CEILING(numeric) 向上取整,并返回大于或等于 numeric 的最小整数。 FLOOR(numeric) 向下取整,并返回小于或等于 numeric 的最大整数。 SIN(numeric) 返回 numeric 的正弦值。 SINH(numeric) 返回 numeric 的双曲正弦值。返回类型为 DOUBLE。 COS(numeric) 返回 numeric 的正切值。 TAN(numeric) 计算给定A的正切值。 TANH(numeric) 返回 numeric 的双曲正切值。返回类型为 DOUBLE。 COT(numeric) 返回 numeric 的余切值。 ASIN(numeric) 返回 numeric 的反正弦值。 ACOS(numeric) 返回 numeric 的反余弦值。 ATAN(numeric) 返回 numeric 的反正切值。 ATAN2(numeric1, numeric2) 返回坐标 (numeric1, numeric2) 的反正切。 COSH(numeric) 返回 numeric 的双曲余弦值。返回值类型为 DOUBLE。 DEGREES(numeric) 返回弧度 numeric 的度数表示。 RADIANS(numeric) 返回度数 numeric 的弧度表示。 SIGN(numeric) 返回 numeric 的符号。 ROUND(numeric, INT) 返回 numeric 四舍五入保留 INT 小数位的值。 PI() 返回无比接近 pi 的值。 E() 返回无比接近 e 的值。 RAND() 返回 [0.0, 1.0) 范围内的伪随机双精度值。 RAND(INT) 返回范围为 [0.0, 1.0) 的伪随机双精度值,初始种子为 INT。 如果两个 RAND 函数具有相同的初始种子,它们将返回相同的数字序列。 RAND_INTEGER(INT) 返回 [0, INT) 范围内的伪随机整数。 RAND_INTEGER(INT1, INT2) 返回范围为 [0, INT2) 的伪随机整数,初始种子为 INT1。 如果两个 RAND_INTGER 函数具有相同的初始种子和边界,它们将返回相同的数字序列。 UUID() 根据 RFC 4122 类型 4(伪随机生成)UUID,返回 UUID(通用唯一标识符)字符串。 例如“3d3c68f7-f608-473f-b60c-b0c44ad4cc4e”,UUID 是使用加密强的伪随机数生成器生成的。 BIN(INT) 以二进制格式返回 INTEGER 的字符串表示形式。如果 INTEGER 为 NULL,则返回 NULL。 例如 4.bin() 返回“100”,12.bin() 返回“1100”。 HEX(numeric) HEX(string) 以十六进制格式返回整数 numeric 值或 STRING 的字符串表示形式。如果参数为 NULL,则返回 NULL。 例如数字 20 返回“14”,数字 100 返回“64”,字符串“hello,world” 返回“68656C6C6F2C776F726C64”。 TRUNCATE(numeric1, integer2) 返回截取 integer2 位小数的数字。如果 numeric1 或 integer2 为 NULL,则返回 NULL。 如果 integer2 为 0,则结果没有小数点或小数部分。integer2 可以为负数,使值的小数点左边的 integer2 位变为零。 此函数也可以传入只有一个 numeric1 参数且不设置 Integer2 以使用。 如果未设置 Integer2 则 Integer2 为 0。 例如 42.324.truncate(2) 为 42.32,42.324.truncate() 为 42.0。 父主题: 内置函数
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace。 例如设置该值为"person",假设key为"jack"则redis中会是"person:jack"。 delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项,与redis的数据类型相对应: hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值: fields:适用于所有数据类型。fields类型是指可以设置多个字段,写入时会取每个字段的值。 fields-scores:适用于sorted set数据类型,表示对每个字段都设置一个字段作为其独立的score。 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型。 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。 该值可参考redis集群的实例类型介绍。 retry-count 否 5 Integer 连接redis集群的尝试次数。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 default-score 否 0 Double 当data-type设置为“sorted-set”数据类型的默认score。 ignore-retraction 否 false Boolean 是否忽略retract消息。 skip-null-values 否 true Boolean 是否跳过null。若为false,则设置为字符串"null"。 pwd_auth_name 否 无 String DLI 侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置账号和密码。 key-ttl-mode 否 no-ttl String key-ttl-mode是开启Redis sink TTL的功能参数,key-ttl-mode的限制为:no-ttl、expire-msec、expire-at-date、expire-at-timestamp。 no-ttl:不设置过期时间。 expire-msec:设置key多久过期,参数为long类型字符串,单位为毫秒。 expire-at-date:设置key到某个时间点过期,参数为UTC时间。 expire-at-timestamp:设置key到某个时间点过期,参数为时间戳。 key-ttl 否 无 String key-ttl是key-ttl-mode的补充参数,有以下几种参数值: 当key-ttl-mode取值为no-ttl时,不需要配置此参数。 当key-ttl-mode取值为expire-msec时,需要配置为可以解析成Long型的字符串。例如5000,表示5000ms后key过期。 当key-ttl-mode取值为expire-at-date时,需要配置为Date类型字符串,例如2011-12-03T10:15:30,表示到期时间为北京时间2011-12-03 18:15:30。 当key-ttl-mode取值为expire-at-timestamp时,需要配置为timestamp类型字符串,单位为毫秒。例如1679385600000,表示到期时间为2023-03-21 16:00:00。
  • 聚合函数 聚合函数是从一组输入值计算一个结果。例如使用COUNT函数计算SQL查询语句返回的记录行数。聚合函数如表1所示。 表1 聚合函数表 函数 返回值类型 描述 COUNT([ ALL ] expression | DISTINCT expression1 [, expression2]*) BIGINT 返回表达式不为NULL的输入行数。对每个值的一个唯一实例使用DISTINCT。 COUNT(*) COUNT(1) BIGINT 返回元组个数 AVG([ ALL | DISTINCT ] expression) DOUBLE 返回所有值的平均值。 对每个值的一个唯一实例使用DISTINCT。 SUM([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的数值之和 对每个值的一个唯一实例使用DISTINCT MAX([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的最大值 MIN([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的最小值 STDDEV_POP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的总体标准偏差 STDDEV_SAMP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本标准偏差 VAR_POP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的总体方差 VAR_SAMP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本方差 COLLECT([ ALL | DISTINCT ] expression) MULTISET 返回所有输入值的MULTISET VARIANCE([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本方差 FIRST_VALUE(expression) 数据实际类型 返回有序数据中的第一个数据 LAST_VALUE(expression) 数据实际类型 返回有序数据中的最后一个数据 父主题: 内置函数
  • 函数说明 表1 Hash函数说明 Hash函数 函数说明 MD5(string) 返回以32个十六进制数所表示的字符串的MD5哈希值 若字符串是null,则返回null SHA1(string) 返回以40个十六进制所表示的字符串的SHA-1哈希值 若字符串是null,则返回null SHA224(string) 返回以56个十六进制数所表示的字符串的SHA-224哈希值 若字符串是null,则返回null SHA256(string) 返回以64个十六进制数所表示的字符串的SHA-256哈希值 若字符串是null,则返回null SHA384(string) 返回以96个十六进制数所表示的字符串的SHA-384哈希值 若字符串是null,则返回null SHA512(string) 返回以128个十六进制数所表示的字符串的SHA-512哈希值 若字符串是null,则返回null SHA2(string, hashLength) 返回使用SHA-2哈希函数族(SHA-224, SHA-256, SHA-384, or SHA-512)得到的哈希值 第一个参数string表示被哈希的字符串,第二个参数hashLength表示哈希值的长度(224、256、384、512) 若任意参数为null,则返回null
  • 语法格式 1234567 create table printSink ( attr_name attr_type (',' attr_name attr_type) * (',' PRIMARY KEY (attr_name,...) NOT ENFORCED)) with ( 'connector' = 'print', 'print-identifier' = '', 'standard-error' = '');
  • 注意事项 当前只支持 CSS 集群7.X及以上版本,推荐使用7.6.2版本。 若未开启安全模式,无需使用任何跨源认证,即无需配置pwd_auth_name、es_auth_name、user_name、password、certificate,且语法中hosts字段值以http开头。 若开启安全模式,未开启https: 方法1:推荐使用password类型跨源认证,并配置pwd_auth_name为跨源认证的名称,且语法中hosts字段值以http开头。 方法2:不使用跨源认证,但需要配置用户名username、密码password,且语法中hosts字段值以http开头。 若开启安全模式,开启https: 方法1:推荐使用 CS S类型跨源认证名称,并配置es_auth_name为跨源认证的名称。请注意该场景hosts字段值以https开头。 方法2:不使用跨源认证,但需要配置用户名username、密码password、证书位置certificate。请注意该场景hosts字段值以https开头。 CSS集群安全组入向规则必须开启ICMP。 数据类型的使用,请参考Format章节。 提交Flink作业前,建议勾选“保存作业日志”参数,在OBS桶选项中选择日志保存的位置,方便后续作业提交失败或运行异常时,查看日志并分析问题原因。 Elasticsearch结果表根据是否定义了主键确定是在upsert模式还是在append模式下工作。 如果定义了主键,Elasticsearch Sink将在upsert模式下工作,该模式可以消费包含UPDATE和DELETE的消息。 如果未定义主键,Elasticsearch Sink将以append模式工作,该模式只能消费INSERT消息。 在Elasticsearch结果表中,主键用于计算Elasticsearch的文档ID。文档ID为最多512个字节不包含空格的字符串。Elasticsearch结果表通过使用“document-id.key-delimiter”参数指定的键分隔符按照DDL中定义的顺序连接所有主键字段,从而为每一行生成一个文档ID字符串。某些类型(例如BYTES、ROW、ARRAY和MAP等)由于没有对应的字符串表示形式,所以不允许其作为主键字段。如果未指定主键,Elasticsearch将自动生成随机的文档ID。 Elasticsearch结果表同时支持静态索引和动态索引。 如果使用静态索引,则索引选项值应为纯字符串,例如myusers,所有记录都将被写入myusers索引。 如果使用动态索引,可以使用{field_name}引用记录中的字段值以动态生成目标索引。您还可以使用 {field_name|date_format_string}将TIMESTAMP、DATE和TIME类型的字段值转换为date_format_string指定的格式。date_format_string与Java的DateTimeFormatter兼容。例如,如果设置为myusers-{log_ts|yyyy-MM-dd},则log_ts字段值为2020-03-27 12:25:55的记录将被写入myusers-2020-03-27索引。
  • 功能描述 DLI将Flink作业的输出数据输出到 云搜索服务 CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于 日志分析 、站内搜索等场景。 云搜索 服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。 云搜索服务的更多信息,请参见《云搜索服务用户指南》
  • 示例 该示例是从Kafka数据源中读取数据,并写入到Elasticsearch结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据Elasticsearch和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置Elasticsearch和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Elasticsearch和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 登录Elasticsearch集群的Kibana,并选择Dev Tools,输入下列语句并执行,以创建值为orders的index: PUT /orders{ "settings": { "number_of_shards": 1 },"mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } }}} 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json");CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'ElasticsearchAddress:ElasticsearchPort', 'index' = 'orders');insert into elasticsearchSink select * from kafkaSource; 连接Kafka集群,向kafka中插入如下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 在Elasticsearch集群的Kibana中输入下述语句并查看相应结果: GET orders/_search { "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "orders", "_type" : "_doc", "_id" : "ae7wpH4B1dV9conjpXeB", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "orders", "_type" : "_doc", "_id" : "au7xpH4B1dV9conjn3er", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } } ] }}
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,固定为:elasticsearch-7。表示连接到 Elasticsearch 7.x 及更高版本集群。 hosts 是 无 String Elasticsearch所在集群的主机名,多个以';'间隔。 index 是 无 String 每条记录的 Elasticsearch 索引。可以是静态索引(例如'myIndex')或动态索引(例如'index-{log_ts|yyyy-MM-dd}')。 username 否 无 String Elasticsearch所在集群的账号。该账号参数需和密码“password”参数同时配置。 password 否 无 String Elasticsearch所在集群的密码。该密码参数需和“username”参数同时配置。 certificate 否 无 String Elasticsearch集群的证书在obs中的位置。 例如:obs://bucket/path/CloudSearchService.cer 仅在开启安全模式,且开启https,且未使用其他跨源认证的场景下下需要配置该参数。 document-id.key-delimiter 否 _ String 连接复合主键的拼接符,默认为_。 failure-handler 否 fail String 对Elasticsearch请求失败时的故障处理策略。有效的策略是: fail: 如果请求失败并因此导致作业失败,则抛出异常。 ignore: 忽略失败并丢弃请求。 retry-rejected:重新添加由于队列容量饱和而失败的请求。 自定义类名:用于使用ActionRequestFailureHandler子类进行故障处理。 sink.flush-on-checkpoint 否 true Boolean 是否在检查点刷新。 如果配置为false,在Elasticsearch进行Checkpoint时,connector将不等待确认所有pending请求已完成。因此,connector不会为请求提供at-least-once保证。 sink.bulk-flush.max-actions 否 1000 Interger 每个批量请求的最大缓冲操作数。可以设置'0'为禁用它。 sink.bulk-flush.max-size 否 2mb MemorySize 每个批量请求的缓冲操作的内存中的最大大小。必须是MB粒度。可以设置'0'为禁用它。 sink.bulk-flush.interval 否 1s Duration 刷新缓冲操作的间隔。可以设置'0'为禁用它。 请注意: 'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions' 都可以设置为'0'刷新间隔,从而允许对缓冲操作进行完整的异步处理。 sink.bulk-flush.backoff.strategy 否 DISABLED String 指定在任何刷新操作由于临时请求错误而失败时如何执行重试。有效的策略是: DISABLED:未执行重试,即在第一个请求错误后失败。 CONSTANT:等待重试之间的退避延迟。 EXPONENTIAL:最初等待退避延迟并在重试之间呈指数增加。 sink.bulk-flush.backoff.max-retries 否 8 Integer 最大退避重试次数。 sink.bulk-flush.backoff.delay 否 50ms Duration 每次退避尝试之间的延迟。 对于CONSTANT退避,这只是每次重试之间的延迟。 对于EXPONENTIAL退避,这是初始基本延迟。 connection.max-retry-timeout 否 无 Duration 重试之间的最大超时时间。 connection.path-prefix 否 无 String 要添加到每个REST通信的前缀字符串,例如, '/v1'。 format 否 json String Elasticsearch连接器支持指定格式。该格式必须生成有效的 json 文档。默认情况下使用内置'json'格式。 请参考Format页面以获取更多详细信息和格式参数。 pwd_auth_name 否 无 String Password类型的跨源认证名称。 仅在使用CSS类型的跨源认证时配置该参数。 es_auth_name和pwd_auth_name只能配置一个。 es_auth_name 否 无 String CSS类型的跨源认证的名称。 仅在使用CSS类型的跨源认证时配置该参数。 es_auth_name和pwd_auth_name只能配置一个。
  • 前提条件 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 请务必确保您的账户下已在云搜索服务里创建了集群。如何创建集群请参考《云搜索服务用户指南》中创建集群章节。 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 函数说明 表1 属性访问函数说明 值接入函数 函数说明 tableName.compositeType.field 选择单个字段,通过名称访问Apache Flink复合类型(如Tuple,POJO等)的字段并返回其值。 tableName.compositeType.* 选择所有字段,将Apache Flink复合类型(如Tuple,POJO等)和其所有直接子类型转换为简单表示,其中每个子类型都是单独的字段。
  • 前提条件 确保已创建Kafka集群。 该场景作业需要运行在DLI的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 1 2 3 4 5 6 7 8 91011121314 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression))with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '');
  • 常见问题 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。参考增强型跨源连接重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword. sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 参数说明 connector 是 无 String 指定要使用的连接器,这里是'datagen'。 rows-per-second 否 10000 Long 每秒生成的行数,用以控制数据发出速率。 fields.#.kind 否 random String 指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。 参数值可以是 'sequence' 或 'random',具体含义如下: random是默认的生成器,您可以通过“fields.#.max”和“fields.#.min”参数指定随机生成的最大和最小值。 当指定的字段类型为char、varchar、string时,可以同时通过“fields.#.length”字段指定长度。random是无界的生成器。 sequence生成器,您可以通过“fields.#.start”和“fields.#.end”指定序列的起始和结束值。sequence是有界的生成器,当序列数字达到结束值,读取结束。 fields.#.min 否 '#'号指定的字段类型的最小值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。 fields.#.max 否 '#'号指定的字段类型的最大值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 随机生成器的最大值,'#' 指定的字段仅适用于数字类型。 fields.#.length 否 100 Integer 当“fields.#.kind”字段为:random时有效。 随机生成器生成字符的长度,#' 指定的字段仅适用于char、varchar、string。 fields.#.start 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的起始值。 fields.#.end 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的结束值。
  • 前提条件 MySQL CDC要求MySQL版本为5.7或8.0.x。 该场景作业需要DLI与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。 MySQL已开启了Binlog,并且binlog_row_image设置为FULL。 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'mysql-cdc'。 hostname 是 无 String MySQL数据库的IP地址或者Hostname。 username 是 无 String MySQL数据库的用户名。 password 是 无 String MySQL数据库的密码。 database-name 是 无 String 访问的数据库名称。 数据库名称支持正则表达式以读取多个数据库的数据,例如flink(.)*表示以flink开头的数据库名。 table-name 是 无 String 访问的表名。 表名支持正则表达式以读取多个表的数据,例如cdc_order(.)*表示以cdc_order开头的表名。 port 否 3306 Integer MySQL数据库的端口号。 server-id 否 5400~6000随机值 String 数据库客户端的一个数字ID,该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。 默认会随机生成一个5400~6400的值。 scan.startup.mode 否 initial String 消费数据时的启动模式。 initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。 latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该Connector启动以后的最新变更。 server-time-zone 否 无 String 数据库在使用的会话时区。 例如:Asia/Shanghai。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置置账号和密码。
  • 常见问题 Q:MySQL CDC源表不支持定义Watermark,怎么进行窗口聚合? A:可以采用非窗口聚合的方式,即将时间字段转换成窗口值,然后根据窗口值进行GROUP BY聚合。 例如:基于上述示例,统计每分钟的订单数,脚本如下(其中order_time为string类型,表示订单的时间)。 insert into printSink select DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm'), count(*) from mysqlCdcSource group by DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm');
  • 语法格式 create table mySqlCdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED))with ( 'connector' = 'mysql-cdc', 'hostname' = 'mysqlHostname', 'username' = 'mysqlUsername', 'password' = 'mysqlPassword', 'database-name' = 'mysqlDatabaseName', 'table-name' = 'mysqlTableName');
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 同步数据库数据的客户端,都会有一个唯一ID,即Server ID。同一个数据库下,建议每个MySQL CDC作业配置不同的Server ID。 主要原因如下: MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量相同的Server ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。 此外,多个作业共享相同的Server ID,会导致Binlog位点错乱,多读或少读数据,因此建议每个CDC作业都配置不同的Server ID。 MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。 若连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。 当使用MySQ CDM 源表时,请不要在源表参数里手动关闭debezium.connect.keep.alive,确保debezium.connect.keep.alive=true(默认值为true)。 如果手动关闭了debezium.connect.keep.alive,一旦发生拉取Binlog线程与MySQL服务器的连接连接异常,拉取Binlog线程不会尝试自动重连,这可能导致无法正常从源端拉取binlog日志。
  • 函数说明 表1 集合函数说明 集合函数 函数说明 CARDINALITY(array) 返回数组中元素个数 array ‘[’ integer ‘]’ 返回数组索引为integer的元素。索引从1开始 ELEMENT(array) 返回数组中的唯一元素。 若数组为空,则返回null 若数组中元素个数大于1,则抛出异常 CARDINALITY(map) 返回map中键值对的条数 map ‘[’ key ‘]’ 返回map中key所对应的值
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 固定为filesystem。 connector.file-path 是 数据输出目录,格式为: schema://file.path。 说明: 当前schame只支持obs和hdfs。 当schema为obs时,表示输出到 对象存储服务 OBS。注意,OBS必须是并行文件系统,不能是OBS桶。 示例:obs://bucketName/fileName,表示数据输出到obs的bucketName桶下的fileName目录中。 当schema为hdfs时,表示输出到HDFS。 示例:hdfs://node-master1sYAx:9820/user/car_infos,其中node-master1sYAx:9820为 MRS 集群NameNode所在节点信息。 format.type 是 输出数据编码格式,当前支持“parquet”格式和“csv”格式。 当schema为obs时,输出数据编码格式仅支持“parquet”格式。 当schema为hdfs时,输出数据编码格式支持“parquet”格式和“csv”格式。 format.field-delimiter 否 属性分隔符。 当编码格式为“csv”时,需要设置属性分隔符,用户可以自定义,默认为“,”。 connector.ak 否 用于访问obs的accessKey 当写入obs时必须填写该字段。 connector.sk 否 用于访问obs的secretKey 当写入obs时必须填写该字段。 connector.partitioned-by 否 分区字段,多个字段以“,”分隔
  • 常见问题 Q:若Flink作业日志中有如下报错信息,应该怎么解决? java.io.IOException: unable to open JDBC writer...Caused by: org.postgresql.util.PSQLException: The connection attempt failed....Caused by: java.net.SocketTimeoutException: connect timed out A:应考虑是跨源没有绑定,或者跨源没有绑定成功。 参考增强型跨源连接章节,重新配置跨源。参考DLI跨源连接DWS失败进行问题排查。 Q:如果该DWS表在某schema下,则应该如何配置? A:如下示例是使用schema为dbuser2下的表area_info: --创建地址维表create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDbname', 'table-name' = 'dbuser2.area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h');
  • 示例 从Kafka源表中读取数据,将DWS表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接DWS数据库实例,在DWS中创建相应的表,作为维表,表名为area_info,SQL语句如下: create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR); 连接DWS数据库实例,向DWS维表area_info中插入测试数据,其语句如下: insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作为数据源,DWS作为维表,数据输出到Kafka结果表中。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime()) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json');--创建地址维表create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'table-name' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h');--根据地址维表生成详细的包含地址的订单信息宽表create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string) with ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json');insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向kafka中source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}{"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,读取kafka中sink topic中数据,结果参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"}{"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"}{"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 语法格式 1 2 3 4 5 6 7 8 91011 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* )with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '');
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'gaussdb'。 url 是 无 String jdbc连接地址。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 无 String 用于对输入进行分区的列名。 与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.num 否 无 Integer 分区的个数。 与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。默认值为0,表示不限制。 scan.auto-commit 否 true Boolean 设置自动提交标志。 它决定每一个statement是否以事务的方式自动提交。 lookup.cache.max-rows 否 无 Integer 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。 默认表示不使用该配置。 lookup.cache.ttl 否 无 Duration 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 默认表示不使用该配置。 lookup.max-retries 否 3 Integer 维表配置,数据拉取最大重试次数。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置账号和密码。
  • 前提条件 请务必确保您的账户下已在 数据仓库 服务(DWS)里创建了DWS集群。如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 数据类型映射 HBase以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。 Flink的HBase连接器利用HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes 进行字节数组和 Flink 数据类型转换。 Flink的HBase连接器将所有数据类型(除字符串外)null 值编码成空字节。对于字符串类型,null 值的字面值由null-string-literal选项值决定。 表2 数据类型映射表 Flink 数据类型 HBase 转换 CHAR / VARCHAR / STRING byte[] toBytes(String s) String toString(byte[] b) BOOLEAN byte[] toBytes(boolean b) boolean toBoolean(byte[] b) BINARY / VARBINARY 返回 byte[]。 DECIMAL byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b) TINYINT new byte[] { val } bytes[0] // returns first and only byte from bytes SMALLINT byte[] toBytes(short val) short toShort(byte[] bytes) INT byte[] toBytes(int val) int toInt(byte[] bytes) BIGINT byte[] toBytes(long val) long toLong(byte[] bytes) FLOAT byte[] toBytes(float val) float toFloat(byte[] bytes) DOUBLE byte[] toBytes(double val) double toDouble(byte[] bytes) DATE 从 1970-01-01 00:00:00 UTC 开始的天数,int 值。 TIME 从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。 TIMESTAMP 从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。 ARRAY 不支持 MAP / MULTISET 不支持 ROW 不支持
共99354条