华为云用户手册

  • 注意事项 每个连接子任务都需要保留自己的Hive表缓存。请确保Hive表可以放入TM任务槽的内存中。 建议为streaming-source.monitor-interval(最新分区作为临时表)或 lookup.join.cache.ttl(所有分区作为临时表)设置一个相对较大的值。否则,作业容易出现性能问题,避免表更新和重新加载过于频繁。 缓存刷新需加载整个Hive表。无法区分新数据和旧数据。
  • 参数说明 在执行与最新的Hive表的时间关联时,Hive表将被缓存到Slot内存中,然后通过键将流中的每条记录与表进行关联,以确定是否找到匹配项。将最新的Hive表用作时间表不需要任何额外的配置。使用以下属性配置Hive表缓存的TTL。在缓存过期后,将重新扫描Hive表以加载最新的数据。 参数 默认值 类型 说明 lookup.join.cache.ttl 60 min Duration 查找连接中构建表的缓存 TTL(例如 10 分钟)。默认情况下,TTL 为 60 分钟。 该选项仅在查找有界的 hive 表源时有效,如果您使用流式 hive 源作为时态表,请使用 streaming-source.monitor-interval 配置数据更新间隔。
  • 功能描述 对于随时间变化的分区表,我们可以将其读取为无界流,如果每个分区包含某个版本的完整数据,则该分区可以被视为时间表的一个版本,时间表的版本保留了分区的数据。Flink支持在处理时间关联中自动跟踪时间表的最新分区(版本)。 最新分区(版本)由 'streaming-source.partition-order' 选项定义。 这是在Flink 流应用作业中将 Hive 表用作维度表的最常见用例。
  • 示例 下面的示例展示了一个经典的业务流水线,维度表来自 Hive,每天通过批处理流水线作业或 Flink 作业更新一次,kafka流来自实时在线业务数据或日志,需要与维度表联接以扩充流。 使用spark sql 创建 hive obs 外表,并插入数据。 CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING ); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联hive维表对数据进行打宽,并输出到print。 如下脚本中的加粗参数请根据实际环境修改。 CREATE CATA LOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '10 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"} 查看print结果表数据。 +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1] 模拟向hive 维表,插入新的分区数据 INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_21', 'product_name_21', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_22', 'product_name_22', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_23', 'product_name_23', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_24', 'product_name_24', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_25', 'product_name_25', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_26', 'product_name_26', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_27', 'product_name_27', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_28', 'product_name_28', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_29', 'product_name_29', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_20', 'product_name_20', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10'); 连接Kafka集群,向Kafka的source topic中插入如下测试数据。关联上一个分区create_time='create_time_1'数据: {"product_id": "product_id_13", "user_name": "name13"} 查看print结果表数据。可观察到hive维表中的前一个分区create_time='create_time_1'数据已经被清除 +I[product_id_13, name13, null, null, null, null, null, null, null, null] 连接Kafka集群,向Kafka的source topic中插入如下测试数据。关联最新分区create_time='create_time_2'数据: {"product_id": "product_id_21", "user_name": "name21"} 查看print结果表数据。可观察到hive维表中保存了最新分区create_time='create_time_2'的数据 +I[product_id_21, name21, product_name_21, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_2]
  • 功能描述 您可以将Hive表用作时态表,通过时态联接来关联Hive表。有关时态联接的详细信息,请参阅 temporal join。 Flink支持processing-time temporal join Hive Table,processing-time temporal join始终会加入最新版本的时态表。Flink支持分区表和 Hive非分区表的临时连接,对于分区表,Flink 支持自动跟踪Hive表的最新分区。详情可参考:Apache Flink Hive Read & Write
  • 注意事项 Flink目前不支持与Hive表进行基于事件时间event-time的时间关联。 Temporal Join The Latest Partition 特性,仅在 Flink STREAMING 模式下支持。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 数据类型的使用,请参考Format章节。 Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的 DLI Lakehouse表。 使用Hive语法创建OBS表 defalut方言: with 属性中需要设置hive.is-external为true。 使用hive 方言:建表语句需要使用EXTERNAL关键字。 使用hive语法的DLI Lakehouse表 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。 创建Flink OpenSource SQL作业时,在作业编辑界面配置开启checkpoint功能。
  • 示例 根据order_id对数据进行去重,其中proctime为事件时间属性列 SELECT order_id, user, product, number FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num FROM Orders) WHERE row_num = 1;
  • 语法说明 ROW_NUMBER(): 从第一行开始,依次为每一行分配一个唯一且连续的号码。 PARTITION BY col1[, col2...]: 指定分区的列,例如去重的键。 ORDER BY time_attr [asc|desc]: 指定排序的列。所制定的列必须为时间属性。目前仅支持proctime。升序( ASC )排列指只保留第一行,而降序排列( DESC )则只保留最后一行。 WHERE rownum = 1: Flink 需要 rownum = 1 以确定该查询是否为去重查询。
  • 代码示例 以调用创建短信应用接口为例,以下代码示例向您展示使用Python SDK的主要步骤: 创建认证。 创建MsgsmsClient实例并初始化。 实例化请求对象。 调用创建短信应用接口。 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkmsgsms.v2.region.msgsms_region import MsgsmsRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkmsgsms.v2 import * if __name__ == "__main__": # 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 # 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量CLOUD_SDK_AK和CLOUD_SDK_SK。 ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] # 创建认证 # 创建BasicCredentials实例并初始化 credentials = BasicCredentials(ak, sk) client = MsgsmsClient.new_builder() \ .with_credentials(credentials) \ .with_region(MsgsmsRegion.value_of("cn-north-4")) \ .build() try: # 实例化请求对象 request = CreateAppRequest() request.body = SmsAppAddReq( ) # 调用创建短信应用接口 response = client.create_app(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) 参数 说明 ak 您的华为云账号访问密钥ID(Access Key ID)。 sk 您的华为云账号秘密访问密钥(Secret Access Key)。 MsgsmsRegion.valueOf("cn-north-4") 请替换为您要访问的MSG SMS 平台所在区域,当前MSGSMS支持访问的区域,在SDK代码msgsms_region.py中已经定义。 您可以在MSGSMS控制台页面左上角查看当前服务所在区 域名 称。 项目源码及更多详细的使用指导请参考华为云Python软件开发工具包(Python SDK)。 推荐您使用API在线调试工具 API Explorer ,API Explorer支持快速调试和检索,调试API的同时,可以根据您的参数实时生成各种开发语言的SDK示例代码,方便您直接根据示例代码使用SDK。
  • 安装SDK 执行如下命令安装华为云Python SDK核心库以及相关服务库。 使用SDK前,您需要安装“huaweicloudsdkcore”和“huaweicloudsdkmsgsms”,具体的SDK版本号请参见SDK开发中心。 使用pip安装 执行如下命令安装华为云Python SDK核心库以及相关服务库: # 安装核心库 pip install huaweicloudsdkcore # 安装MSGSMS服务库 pip install huaweicloudsdkmsgsms 使用源码安装 执行如下命令安装华为云Python SDK核心库以及相关服务库: # 安装核心库 cd huaweicloudsdkcore-${version} python setup.py install # 安装MSGSMS服务库 cd huaweicloudsdkmsgsms-${version} python setup.py install
  • 代码示例 以调用创建短信应用接口为例,以下代码示例向您展示使用Java SDK的主要步骤: 创建认证。 创建MsgsmsClient实例并初始化。 实例化请求对象。 调用创建短信应用接口。 package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.msgsms.v2.region.MsgsmsRegion; import com.huaweicloud.sdk.msgsms.v2.*; import com.huaweicloud.sdk.msgsms.v2.model.*; public class CreateAppSolution { public static void main(String[] args) { // 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 // 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量CLOUD_SDK_AK和CLOUD_SDK_SK。 String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); // 创建认证 ICredential auth = new BasicCredentials() .withAk(ak) .withSk(sk); // 创建MsgsmsClient实例并初始化 MsgsmsClient client = MsgsmsClient.newBuilder() .withCredential(auth) .withRegion(MsgsmsRegion.valueOf("cn-north-4")) .build(); // 实例化请求对象 CreateAppRequest request = new CreateAppRequest(); SmsAppAddReq body = new SmsAppAddReq(); request.withBody(body); try { // 调用创建短信应用接口 CreateAppResponse response = client.createApp(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } } 参数 说明 ak 您的华为云账号访问密钥ID(Access Key ID)。 sk 您的华为云账号秘密访问密钥(Secret Access Key)。 MsgsmsRegion.valueOf("cn-north-4") 请替换为您要访问的MSGSMS平台所在区域,当前MSGSMS支持访问的区域,在SDK代码MsgsmsRegion.java中已经定义。 您可以在MSGSMS控制台页面左上角查看当前服务所在区域名称。 项目源码及更多详细的使用指导请参考华为云Java软件开发工具包(Java SDK)。 推荐您使用API在线调试工具API Explorer ,API Explorer支持快速调试和检索,调试API的同时,可以根据您的参数实时生成各种开发语言的SDK示例代码,方便您直接根据示例代码使用SDK。
  • 代码示例 以调用创建短信应用接口为例,以下代码示例向您展示使用Go SDK的主要步骤: 创建认证。 创建MsgsmsClient实例并初始化。 实例化请求对象。 调用创建短信应用接口。 package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" msgsms "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/msgsms/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/msgsms/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/msgsms/v2/region" ) func main() { // 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 // 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量CLOUD_SDK_AK和CLOUD_SDK_SK。 ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") // 创建认证 auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). Build() // 创建MsgsmsClient实例并初始化 client := msgsms.NewMsgsmsClient( msgsms.MsgsmsClientBuilder(). WithRegion(region.ValueOf("cn-north-4")). WithCredential(auth). Build()) // 实例化请求对象 request := &model.CreateAppRequest{} request.Body = &model.SmsAppAddReq{ } // 调用创建短信应用接口 response, err := client.CreateApp(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } } 参数 说明 ak 您的华为云账号访问密钥ID(Access Key ID)。 sk 您的华为云账号秘密访问密钥(Secret Access Key)。 region.ValueOf("cn-north-4") 请替换为您要访问的MSGSMS平台所在区域,当前MSGSMS支持访问的区域,在SDK代码region.go中已经定义。 您可以在MSGSMS控制台页面左上角查看当前服务所在区域名称。 项目源码及更多详细的使用指导请参考华为云Go软件开发工具包(Go SDK)。 推荐您使用API在线调试工具API Explorer ,API Explorer支持快速调试和检索,调试API的同时,可以根据您的参数实时生成各种开发语言的SDK示例代码,方便您直接根据示例代码使用SDK。
  • SDK列表 表1提供了MSGSMS支持的SDK,您可以在GitHub仓库查看SDK更新历史、获取安装包以及查看指导文档。 相关开发包请从华为云MSGSMS开发工具包(SDK)获取。 表1 SDK列表 编程语言 Github地址 参考文档 Java huaweicloud-sdk-java-v3 Java SDK使用指导 Python huaweicloud-sdk-python-v3 Python SDK使用指导 Go huaweicloud-sdk-go-v3 Go SDK使用指导 Node.js huaweicloud-sdk-nodejs-v3 Node.js SDK使用指南 .NET huaweicloud-sdk-net-v3 .Net SDK使用指导
  • 代码示例 以调用创建短信应用接口为例,以下代码示例向您展示使用.NET SDK的主要步骤: 创建认证。 创建MsgsmsClient实例并初始化。 实例化请求对象。 调用创建短信应用接口。 using System; using System.Collections.Generic; using HuaweiCloud.SDK.Core; using HuaweiCloud.SDK.Core.Auth; using HuaweiCloud.SDK.Msgsms; using HuaweiCloud.SDK.Msgsms.V2; using HuaweiCloud.SDK.Msgsms.V2.Model; namespace CreateAppSolution { class Program { static void Main(string[] args) { // 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 // 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量CLOUD_SDK_AK和CLOUD_SDK_SK。 string ak = Environment.GetEnvironmentVariable("CLOUD_SDK_AK"); string sk = Environment.GetEnvironmentVariable("CLOUD_SDK_SK"); var config = HttpConfig.GetDefaultConfig(); config.IgnoreSslVerification = true; // 创建认证 var auth = new BasicCredentials(ak, sk); // 创建MsgsmsClient实例并初始化 var client = MsgsmsClient.NewBuilder() .WithCredential(auth) .WithRegion(MsgsmsRegion.ValueOf("cn-north-4")) .WithHttpConfig(config) .Build(); // 实例化请求对象 var req = new CreateAppRequest { }; req.Body = new SmsAppAddReq() { }; try { // 调用创建短信应用接口 var resp = client.CreateApp(req); var respStatusCode = resp.HttpStatusCode; Console.WriteLine(respStatusCode); } catch (RequestTimeoutException requestTimeoutException) { Console.WriteLine(requestTimeoutException.ErrorMessage); } catch (ServiceResponseException clientRequestException) { Console.WriteLine(clientRequestException.HttpStatusCode); Console.WriteLine(clientRequestException.RequestId); Console.WriteLine(clientRequestException.ErrorCode); Console.WriteLine(clientRequestException.ErrorMsg); } catch (ConnectionException connectionException) { Console.WriteLine(connectionException.ErrorMessage); } } } } 参数 说明 ak 您的华为云账号访问密钥ID(Access Key ID)。 sk 您的华为云账号秘密访问密钥(Secret Access Key)。 MsgsmsRegion.ValueOf("cn-north-4") 请替换为您要访问的MSGSMS平台所在区域,当前MSGSMS支持访问的区域,在SDK代码MsgsmsRegion.cs中已经定义。 您可以在MSGSMS控制台页面左上角查看当前服务所在区域名称。 项目源码及更多详细的使用指导请参考华为云.Net软件开发工具包(.Net SDK)。 推荐您使用API在线调试工具API Explorer ,API Explorer支持快速调试和检索,调试API的同时,可以根据您的参数实时生成各种开发语言的SDK示例代码,方便您直接根据示例代码使用SDK。
  • DLI是否存在Apache Spark 命令注入漏洞(CVE-2022-33891)? 不存在。 DLI没有启动spark.acls.enable配置项,所以不涉及Apache Spark 命令注入漏洞(CVE-2022-33891)。 该漏洞主要影响在启用了ACL(访问控制列表)时,可以通过提供任意用户名来执行命令导致数据安全收到威胁。 DLI在设计时充分考虑了数据安全和数据隔离,因此没有启用相关的配置项,所以不会受到这个漏洞的影响。 父主题: DLI产品咨询类
  • 应用场景 本文列举了一些常见的从checkpoint恢复的场景供您参考,如表1所示。 更多场景你可以使用从checkpoint恢复的原则结合实际情况进行判断。 表1 从checkpoint恢复的常见场景 场景 是否支持恢复 说明 调整或者增加并行数 不支持 该操作修改了作业的并行数,即修改了作业的运行逻辑。 修改Flink SQL语句、Flink Jar作业等操作 不支持 该操作修改了作业对资源的算法逻辑。 例如原有的算法的语句是执行加减运算,当前需要恢复的状态将算法的语句修改成为乘除取余的运算,是无法从checkpoint直接恢复的。 修改“静态流图” 不支持 该操作修改了作业对资源的算法逻辑。 修改“单TM所占CU数”参数 支持 对计算资源的修改并没有影响到作业算法或算子的运行逻辑。 作业运行异常或物理停电 支持 当对作业参数未作出修改。
  • 如何在一个Flink作业中将数据写入到不同的Elasticsearch集群中? 在Flink 作业中,你可以使用CREATE语句来定义Source表和Sink表,并指定它们的连接器类型以及相关的属性。 如果需要将数据写入到不同的Elasticsearch集群,您需要为每个集群配置不同的连接参数,并确保Flink作业能够正确地将数据路由到各个集群。 例如本例中分别对es1和es2定义连接器类型以及相关的属性。 在对应的Flink作业中添加如下SQL语句。 create source stream ssource(xx); create sink stream es1(xx) with (xx); create sink stream es2(xx) with (xx); insert into es1 select * from ssource; insert into es2 select * from ssource; 父主题: Flink SQL作业类
  • 作业语义检验时提示DIS通道不存在怎么处理? 处理方法如下: 登录到DIS管理控制台,在左侧菜单栏选择“通道管理”。检查Flink作业SQL语句中的DIS通道是否存在。 如果Flink作业中的DIS通道还未创建,请参见《 数据接入服务 用户指南》中“开通DIS通道”章节。 确保创建的DIS通道和Flink作业处于统一区域。 如果DIS通道已创建,则检查确保DIS通道和Flink流作业是否处与同一区域。 父主题: Flink SQL作业类
  • SQL作业如何指定表的部分字段进行表数据的插入 如果你需要将数据插入到表中,但只想指定部分字段,你可以使用INSERT INTO语句结合SELECT子句来实现。 但是DLI目前不支持直接在INSERT INTO语句中指定部分列字段进行数据插入,您需要确保在SELECT子句中选择的字段数量和类型与目标表的Schema信息匹配。即确保源表和目标表的数据类型和列字段个数相同,以避免插入失败。 如果目标表中的某些字段在SELECT子句中没有被指定,那么这些字段也可能被插入默认值或置为空值(取决于该字段是否允许空值)。 父主题: SQL作业开发类
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE sink_table ( name string, num INT, p_day string, p_hour string ) partitioned by (p_day, p_hour) WITH ( 'connector' = 'filesystem', 'path' = 'obs://*** ', 'format' = 'parquet', 'source.monitor-interval'='' );
  • 示例 从obs表作为数据源读取数据,输出到print connector。 CREATE TABLE obs_source( name string, num INT, `file.path` STRING NOT NULL METADATA ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://demo/sink_parquent_obs', 'format' = 'parquet', 'source.monitor-interval'='1 h' ); CREATE TABLE print ( name string, num INT, path STRING ) WITH ( 'connector' = 'print' ); insert into print select * from obs_source;
  • 注意事项 更多具体使用可参考开源社区文档:Apache Kafka SQL 连接器。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。 建表时数据类型的使用请参考Format章节。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' );
  • Topic和Partition的探测 topic 和 topic-pattern 配置项决定了 source 消费的 topic 或 topic 的匹配规则。topic 配置项可接受使用分号间隔的 topic 列表,例如 topic-1;topic-2。 topic-pattern 配置项使用正则表达式来探测匹配的 topic。例如 topic-pattern 设置为 test-topic-[0-9],则在作业启动时,所有匹配该正则表达式的 topic(以 test-topic- 开头,以一位数字结尾)都将被 consumer 订阅。 为允许 consumer 在作业启动之后探测到动态创建的 topic,请将 scan.topic-partition-discovery.interval 配置为一个非负值。这将使 consumer 能够探测匹配名称规则的 topic 中新的 partition。 topic列表和topic匹配规则只适用于 source。对于sink端,Flink目前只支持单一topic。
  • 示例2:将json格式DMS Kafka作为源表,输出到Kafka sink中(适用于Kafka集群未开启SASL_SSL场景) 将Kafka作为源表,Kafka作为结果表,从Kafka中读取编码格式为json数据类型的数据,输出到日志文件中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,并提交运行。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into kafkaSink select * from kafkaSource; 向Kafka的源表的topic中发送如下数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 读取Kafka的结果表的topic,其数据结果参考如下: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  • 功能描述 Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 表1 支持类别 类别 详情 支持表类型 源表、结果表 支持数据格式 CS V JSON Apache Avro Confluent Avro Debezium CDC Canal CDC Maxwell CDC OGG CDC Raw
  • 常见问题 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。 具体操作请参考增强型跨源连接。 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword. sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。
  • 示例1 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到30min或128MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min' ); INSERT into sink_table SELECT * from orders;
  • 示例2 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到checkpoint间隔或达到100MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'parquet', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min', 'auto-compaction'='true', 'compaction.file-size'='100m' ); INSERT into sink_table SELECT * from orders;
  • 功能描述 FileSystem sink用于将数据输出到分布式文件系统HDFS或者 对象存储服务 OBS等文件系统。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的Part文件。完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。即桶中将包含一个小时间隔内接收到的记录。 桶目录中的数据被拆分成多个Part文件。对于相应的接收数据的桶的Sink的每个Subtask,每个桶将至少包含一个Part文件。将根据配置的滚动策略来创建其他Part文件。对于Row Formats默认的策略是根据Part文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。对于Bulk Formats在每次创建Checkpoint时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。更多信息参考文件系统 SQL 连接器 在STREAMING模式下使用FileSink需要开启Checkpoint功能。Part文件只在Checkpoint成功时生成。如果没有开启Checkpoint功能,文件将永远停留在in-progress或者pending的状态,并且下游系统将不能安全读取该文件数据。 sink end算子的接受记录数为checkpoint的个数,非实际的发送数据,实际发送数据量请参考streaming-writer或StreamingFileWriter算子的记录数。
共100000条