云服务器内容精选

  • 在弹性资源池中添加队列 创建完弹性资源池后,弹性资源池需要添加一个或多个队列用于后续作业的运行。本节操作介绍在弹性资源池中添加队列的操作步骤。 添加到弹性资源池中的队列不再单独计费,以弹性资源池为计费项计费。 当弹性资源池添加队列时会引起弹性资源CUs扩缩容变化。 在左侧导航栏单击“弹性资源池”,可进入弹性资源池管理页面。 选择要操作的弹性资源池,在“操作”列,单击“添加队列”。 在“添加队列”界面,首先需要配置队列的基础配置,具体参数信息如下。 表3 弹性资源池添加队列基础配置 参数名 参数描述 名称 弹性资源池添加的队列名称。 类型 SQL队列:用于运行SQL作业。 通用队列:用于运行Spark作业 、Flink 作业。 执行引擎 如果队列类型选择为“SQL队列”,则可以选择队列引擎是:Spark或者HetuEngine HetuEngine类型的SQL队列最小CU不能小于96CUs。 使用HetuEngine引擎提交SQL作业必须配置 DLI 作业桶,具体操作请参考配置DLI作业桶。 企业项目 选择队列的企业项目。弹性资源池支持添加不同企业项目的队列资源。 企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。 关于如何设置企业项目请参考《企业管理用户指南》。 说明: 只有开通了企业管理服务的用户才显示该参数。 描述 弹性资源池添加队列的描述信息。 标签 使用标签标识云资源。包括标签键和标签值。如果您需要使用同一标签标识多种云资源,即所有服务均可在标签输入框下拉选择同一标签,建议在标签管理服务(TMS)中创建预定义标签。 如您的组织已经设定DLI的相关标签策略,则需按照标签策略规则为资源添加标签。标签如果不符合标签策略的规则,则可能会导致资源创建失败,请联系组织管理员了解标签策略详情。 具体请参考《标签管理服务用户指南》。 说明: 最多支持20个标签。 一个“键”只能添加一个“值”。 每个资源中的键名不能重复。 标签键:在输入框中输入标签键名称。 说明: 标签的键的最大长度为128个字符,标签的键可以包含任意语种字母、数字、空格和_ . : +-@ ,但首尾不能含有空格,不能以_sys_开头。 标签值:在输入框中输入标签值。 说明: 标签值的最大长度为255个字符,标签的值可以包含任意语种字母、数字、空格和_ . : +-@ 。 单击“下一步”,在“扩缩容策略”界面配置当前队列在弹性资源池的扩缩容策略。 图1 添加队列时配置扩缩容策略 单击“新增”,可以添加不同优先级、时间段、“最小CU”和“最大CU”扩缩容策略。每条扩缩容策略的参数说明如下: 表4 扩缩容策略参数说明 参数名 参数描述 优先级 当前弹性资源池中的优先级数字越大表示优先级越高。当前优先级支持的范围为:1到100。 时间段 时间段设置仅支持整点,左侧为开始时间,右侧为结束时间。请注意以下说明: 时间区间包括开始时间,不包括结束时间即[开始时间, 结束时间)。 例如当前选择的时间段范围为:01--17,则表示当前扩缩容规则生效时间范围为[01,17)。 同一队列不同优先级的时间段区间不能有交集。 最小CU 当前扩缩容策略支持的最小CU数。 在全天的任意一个时间段内,弹性资源池中所有队列的最小CU数之和必须小于等于弹性资源池的最小CU数。 当队列的最小CUs小于16CUs时,在队列属性中设置的“最大spark driver实例数”和“最大预拉起spark driver实例数”不生效。了解队列属性设置。 HetuEngine类型的SQL队列最小CU不能小于96CUs。 最大CU 当前扩缩容策略支持的最大CU数。 在全天的任意一个时间段内,弹性资源池中任意一个队列的最大CU必须小于等于弹性资源池的最大CU。 首条扩缩容策略是默认策略,不能删除和修改时间段配置。 Flink作业不支持触发弹性资源池队列的自动扩缩容。 单击“确定”完成添加队列配置。弹性资源池队列添加完成后,可以参考调整弹性资源池中队列的扩缩容策略查看弹性资源池添加的所有队列配置和策略信息。
  • 弹性资源池约束与限制 表1 弹性资源池约束限制 限制项 说明 资源规格 当前弹性资源池最大的计算资源 32000CUs。 弹性资源池中可创建队列的最小CU: 通用队列:4CUs SQL队列:Spark SQL队列:8CUs;HetuEngine SQL队列:96CUs 弹性资源池计费模式 弹性资源池支持按需和包年包月的购买方式。 不支持切换弹性资源池的计费模式。 当前仅支持包年包月计费模式的弹性资源池进行规格变更。 按需计费的弹性资源池默认勾选专属资源模式,自创建起按自然小时收费。 管理弹性资源池 弹性资源池不支持切换区域。 Flink 1.10及其以上版本的作业支持在弹性资源池运行。 弹性资源池网段设置后不支持更改。 仅支持查看30天以内的弹性资源池扩缩容历史。 弹性资源池无法直接访问公网。 弹性资源池关联队列 弹性资源池关联队列: 仅支持关联按需计费模式的队列(包括专属队列)。 队列和弹性资源池状态正常,资源未被冻结。 弹性资源池扩缩容 弹性资源池CU设置、弹性资源池中添加/删除队列、修改弹性资源池中队列的扩缩容策略、系统自动触发弹性资源池扩缩容时都会引起弹性资源池CU的变化,部分情况下系统无法保证按计划扩容/缩容至目标CUs: 弹性资源池扩容时,可能会由于物理资源不足导致弹性资源池无法扩容到设定的目标大小。 弹性资源池缩容时,系统不保证将队列资源完全缩容到设定的目标大小。 在执行缩容任务时,系统会先检查资源使用情况,判断是否存在缩容空间,如果现有资源无法按照最小缩容步长执行缩容任务,则弹性资源池可能缩容不成功,或缩容一部分规格的情况。 因资源规格不同可能有不同的缩容步长,通常是16CUs、32CUs、48CUs、64CUs等。 示例:弹性资源池规格为192CUs,资源池中的队列执行作业占用了68CUs,计划缩容至64CUs。 执行缩容任务时,系统判断剩余124CUs,按64CUs的缩容步长执行缩容任务,剩余60CUs资源无法继续缩容,因此弹性资源池执行缩容任务后规格为128CUs。
  • 约束与限制 仅标准版弹性资源池的Spark引擎的SQL队列支持配置队列属性。 仅在队列创建完成后支持设置队列属性。 不支持批量设置队列属性。 弹性资源池中的队列,当队列的最小CUs小于16CUs时,在队列属性中设置的“最大spark driver实例数”和“最大预拉起spark driver实例数”不生效。 开启“作业结果保存策略”,即配置作业结果保存至DLI作业桶后,请务必在提交SQL作业前配置DLI作业桶信息,否则SQL作业可能会提交失败。
  • 前提条件 配置前,请先购买OBS桶或并行文件系统。大数据场景推荐使用并行文件系统,并行文件系统(Parallel File System)是 对象存储服务 (Object Storage Service,OBS)提供的一种经过优化的高性能文件系统,提供毫秒级别访问时延,以及TB/s级别带宽和百万级别的IOPS,能够快速处理高性能计算(HPC)工作负载。 并行文件系统的详细介绍和使用说明,请参见《并行文件系统特性指南》。
  • 使用须知 运行在基础版弹性资源池队列上的作业不支持设置作业优先级。 对于每个作业都允许设置优先级,其取值为1-10,数值越大表示优先级越高。优先满足高优先级作业的计算资源,即如果高优先级作业计算资源不足,则会减少低优先级作业的计算资源 通用队列上运行的Flink作业优先级默认为5。 作业优先级的调整需要停止作业进行编辑,并提交运行才能生效。 对于Flink作业,请参考开启Flink作业动态扩缩容设置flink.dli.job.scale.enable=true开启动态扩缩容功能,再设置作业优先级。 调整作业优先级需要停止作业后编辑,并重新提交运行才能生效。
  • 跨源分析开发方式 表1提供DLI支持的数据源对应的开发方式。 表1 跨源分析语法参考 服务名称 开发SQL作业 开发Spark jar作业 开发Flink OpenSource SQL作业 开发Flink Jar作业 CloudTable HBase 创建HBase关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 Hbase源表 Hbase结果表 Hbase维表 - CloudTable OpenTSDB 创建OpenTSDB关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 - - CSS 创建 CS S关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 Elasticsearch结果表 - DCS Redis 创建DCS关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 Redis源表 Redis结果表 Redis维表 Flink作业样例 DDS 创建DDS关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 - - DMS - - Kafka源表 Kafka结果表 - DWS 创建DWS关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 DWS源表 DWS结果表 DWS维表 Flink作业样例 MRS HBase 创建HBase关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 Hbase源表 Hbase结果表 Hbase维表 Flink作业样例 MRS Kafka - - Kafka源表 Kafka结果表 Flink作业样例 MRS OpenTSDB 创建OpenTSDB关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 - - RDS MySQL 创建RDS关联表 插入数据 查询数据 scala样例代码 pyspark样例代码 java样例代码 MySQL CDC源表 - RDS PostGre 创建RDS关联表 插入数据 查询数据 - Postgres CDC源表 -
  • 步骤1:在 IAM 控制台创建云服务委托并授权 登录管理控制台。 单击右上方登录的用户名,在下拉列表中选择“ 统一身份认证 ”。 在左侧导航栏中,单击“委托”。 在“委托”页面,单击“创建委托”。 在“创建委托”页面,设置如下参数: 委托名称:按需填写,例如“dli_obs_agency_access”。 委托类型:选择“云服务”。 云服务:(“委托类型”选择“云服务”时出现此参数项。)在下拉列表中选择“DLI"。 持续时间:选择“永久”。 描述:非必选,可以填写“拥有OBS OperateAccess权限的委托”。 图2 创建委托 配置完委托的基本信息后,单击“下一步”。 授予当前委托所需的权限策略,单击“新建策略”。 配置策略信息。 输入策略名称,本例:dli-obs-agency。 选择“JSON视图”。 在策略内容中粘贴自定义策略。 本例权限包含访问和使用OBS的权限,适用于以下场景:DLI Flink作业下载OBS对象、OBS/DWS数据源(外表)、日志转储、使用savepoint、开启Checkpoint。DLI Spark作业下载OBS对象、读写OBS外表。 更多Flink作业常见委托权限配置请参考常见场景的委托权限策略。 { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "obs:bucket:GetBucketPolicy", "obs:bucket:GetLifecycleConfiguration", "obs:bucket:GetBucketLocation", "obs:bucket:ListBucketMultipartUploads", "obs:bucket:GetBucketLogging", "obs:object:GetObjectVersion", "obs:bucket:GetBucketStorage", "obs:bucket:GetBucketVersioning", "obs:object:GetObject", "obs:object:GetObjectVersionAcl", "obs:object:DeleteObject", "obs:object:ListMultipartUploadParts", "obs:bucket:HeadBucket", "obs:bucket:GetBucketAcl", "obs:bucket:GetBucketStoragePolicy", "obs:object:AbortMultipartUpload", "obs:object:DeleteObjectVersion", "obs:object:GetObjectAcl", "obs:bucket:ListBucketVersions", "obs:bucket:ListBucket", "obs:object:PutObject" ], "Resource": [ "OBS:*:*:bucket:bucketName",//请替换bucketName为对应的桶名称 "OBS:*:*:object:*" ] }, { "Effect": "Allow", "Action": [ "obs:bucket:ListAllMyBuckets" ] } ] } 按需输入策略描述。 新建策略完成后,单击“下一步”,返回委托授权页面。 选择步骤8新建的自定义策略。 图3 选择自定义策略 单击“下一步”,选择委托的授权范围。 了解更多授权操作说明请参考创建用户组并授权。 所有资源:授权后,IAM用户可以根据权限使用账号中所有资源,包括企业项目、区域项目和全局服务资源。 全局服务资源:全局服务部署时不区分区域,访问全局级服务,不需要切换区域,全局服务不支持基于区域项目授权。如对象存储服务(OBS)、内容分发网络(CDN)等。授权后,用户根据权限使用全局服务的资源。 指定区域项目资源:授权后,IAM用户根据权限使用所选区域项目中的资源,未选择的区域项目中的资源,该IAM用户将无权访问。 指定企业项目资源:授权后,IAM用户根据权限使用所选企业项目中的资源。如企业项目A包含资源B,资源B部署在北京四和上海二,IAM用户所在用户组关联企业项目A后,北京四和上海二的资源B用户都可访问,不在企业项目A内的其他资源,该IAM用户将无权访问。 本例自定义策略中是OBS权限,因此选择全局服务资源。如果使用的是DLI权限,推荐选择“指定区域项目资源”。 单击“确定”,完成授权。 授权后需等待15-30分钟才可生效。
  • DLI自定义委托场景 表1 DLI自定义委托场景 场景 委托名称 适用场景 权限策略 允许DLI按表生命周期清理数据 dli_data_clean_agency 数据清理委托,表生命周期清理数据、Lakehouse表数据清理使用。 该委托需新建后自定义权限,但委托名称固定为dli_data_clean_agency。 数据清理委托权限配置 允许DLI读写OBS将日志转储 自定义 DLI Flink作业下载OBS对象、OBS/DWS数据源(外表)、日志转储、使用savepoint、开启checkpoint,DLI Spark作业下载OBS对象、读写OBS外表。 访问和使用OBS的权限策略 允许DLI在访问DEW获取数据访问凭证 自定义 DLI 作业使用DEW-C SMS 凭证管理能力。 使用DEW加密功能的权限 允许访问DLI Catalog元数据 自定义 DLI 访问DLI元数据。 访问DLI Catalog元数据的权限 允许访问LakeFormation Catalog元数据 自定义 DLI 访问LakeFormation元数据。 访问LakeFormation Catalog元数据的权限
  • 约束与限制 自定义委托名称不可与系统默认委托重复,即不可以是dli_admin_agency、dli_management_agency、dli_data_clean_agency。 允许DLI按表生命周期清理数据的委托名称必须为dli_data_clean_agency。 仅Flink 1.15和Spark 3.3.1(Spark通用队列场景)及以上版本的引擎执行作业支持配置自定义委托。 更新委托权限后,系统将升级您的dli_admin_agency为dli_management_agency,新的委托包含跨源操作、 消息通知 、用户授权操作所需的权限。除此之外的其他委托权限需求都需要用户自定义委托。了解dli_management_agency请参考DLI委托概述。 常见新建委托场景:允许DLI读写OBS数据、日志转储、Flink checkopoint;允许DLI在访问DEW获取数据访问凭证、允许访问Catalog获取元数据等场景。以上场景的委托权限请参考常见场景的委托权限策略。
  • 写数据至多个Sink表 EXECUTE STATEMENT SET BEGIN ... END; 是写数据至多个Sink表的必填语句,用于定义在同一个作业中执行多个插入数据的操作。 写数据至多个Sink表时,EXECUTE STATEMENT SET BEGIN ... END;是必填项。 语法格式 1 2 3 4 5 6 7 8 9 10 11 EXECUTE STATEMENT SET BEGIN -- 第一个DML语句 INSERT INTO your_sink1 SELECT ... FROM your_source WHERE ...; -- 第二个DML语句 INSERT INTO your_sink2 SELECT ... FROM your_source WHERE ... ... END; 示例 本例定义了源表datagen_source、Sink表print_sinkA和print_sinkB。然后使用EXECUTE STATEMENT执行两个INSERT INTO语句,分将转换后的数据写入两个不同的sink。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 --使用 datagen connector创建源表 datagen_source CREATE TABLE datagen_source ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'datagen' ); --使用print connector创建结果表 print_sinkA 和 print_sinkB CREATE TABLE print_sinkA( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'print' ); CREATE TABLE print_sinkB( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'print' ); --使用 EXECUTE STATEMENT SET BEGIN来执行两个 INSERT INTO 语句。 --第一个INSERT INTO语句将datagen_source表中的数据按需转换后写入 print_sinkA。 --第二个 INSERT INTO 语句将数据按需转换后写入 print_sinkB。。 EXECUTE STATEMENT SET BEGIN INSERT INTO print_sinkA SELECT UPPER(name), min(age) FROM datagen_source GROUP BY UPPER(name); INSERT INTO print_sinkB SELECT LOWER(name), max(age) FROM datagen_source GROUP BY LOWER(name); END;
  • 写数据至一个Sink表 语法格式 1 2 INSERT INTO your_sink SELECT ... FROM your_source WHERE ... 示例 本例定义了两个表my_source 和my_sink,并使用INSERT INTO语句source表选择数据并插入到sink表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 --使用datagen connector创建源表my_source CREATE TABLE my_source ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'datagen'); --使用jdbc connector创建目标表my_sink CREATE TABLE my_sink ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx/your-database', 'table-name' = 'your-table', 'username' = 'your-username', 'password' = 'your-password' ); --使用INSERT INTO语句从my_source表选择数据,并插入到my_sink表 INSERT INTO my_sink SELECT name, age FROM my_source;
  • 常见问题 Q:若Flink作业日志中有如下报错信息,应该怎么解决? java.io.IOException: unable to open JDBC writer ... Caused by: org.postgresql.util.PSQLException: The connection attempt failed. ... Caused by: java.net.SocketTimeoutException: connect timed out A:应考虑是跨源没有绑定,或者跨源没有绑定成功。 参考增强型跨源连接章节,重新配置跨源。参考DLI跨源连接DWS失败进行问题排查。 Q:如果该DWS表在某schema下,则应该如何配置? A:如下示例是使用schema为dbuser2下的表area_info: --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDbname', 'table-name' = 'dbuser2.area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'gaussdb'。 url 是 无 String jdbc连接地址。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 无 String 用于对输入进行分区的列名。 与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.num 否 无 Integer 分区的个数。 与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。默认值为0,表示不限制。 scan.auto-commit 否 true Boolean 设置自动提交标志。 它决定每一个statement是否以事务的方式自动提交。 lookup.cache.max-rows 否 无 Integer 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。 默认表示不使用该配置。 lookup.cache.ttl 否 无 Duration 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 默认表示不使用该配置。 lookup.max-retries 否 3 Integer 维表配置,数据拉取最大重试次数。
  • 示例 从Kafka源表中读取数据,将DWS表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接DWS数据库实例,在DWS中创建相应的表,作为维表,表名为area_info,SQL语句如下: create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR); 连接DWS数据库实例,向DWS维表area_info中插入测试数据,其语句如下: insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作为数据源,DWS作为维表,数据输出到Kafka结果表中。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDbName', 'table-name' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向kafka中source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,读取kafka中sink topic中数据,结果参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。