云服务器内容精选

  • DLI 队列类型 DLI分为三种队列类型: default队列、SQL队列、通用队列,您可以根据业务场景和作业特性选择最合适的队列类型。 default队列: DLI服务预置的队列,所有用户共享。 不支持指定default队列资源大小,资源在执行作业时按需分配,并按实际扫描的数据量计费。 由于default队列是共享资源,在使用时可能会出现资源抢占的情况,不能保证每次都能获得资源来执行作业。 default队列适用小规模或临时的数据处理需求。对于重要的或需要保证资源的作业,建议购买弹性资源池并在弹性资源池中创建队列来执行作业。 SQL队列: SQL队列是用于执行SQL作业的队列,支持指定引擎类型包括Spark和HetuEngine。 SQL队列适用于需要快速数据查询和分析,以及需要定期清理缓存或重置环境的业务。 通用队列: 通用队列用于执行Spark作业、Flink OpenSource SQL作业和Flink Jar作业的队列。 适合适用于复杂数据处理、实时数据流处理或批量数据处理的场景。
  • 弹性资源池使用场景 推荐使用弹性资源池队列,提高资源使用的灵活性和资源利用效率。本节介绍常见的弹性资源池的使用场景。 场景一:固定资源造成资源浪费和资源不足的场景 在每天的不同时段,作业任务对资源的请求量也会发生变化,如果采用固定资源规格则会导致资源浪费或者资源不足的问题。例如,如下图图2示例可以看出: 大约在凌晨4点到7点这个数据段,ETL作业任务结束后没有其他作业,因为资源固定一直占用,导致严重的资源浪费。 上午9点到12点以及下午14点16点的两个时段,ETL报表和作业查询的请求量很高,因为当前固定资源不够,导致作业任务排队,任务一直排队。 图2 固定资源场景 场景二:资源相互隔离,没有共享,造成资源浪费的场景 某公司下有两个部门,两个部门的不同作业运行在DLI的两个队列上。部门A在上午8点到12点业务比较空闲,资源有剩余,部门B在这个时间段业务请求量大,原有资源规格满足不了,需要扩容时,请求不了部门A的队列资源,造成资源浪费。 图3 资源隔离造成的资源浪费 弹性资源池通过“分时按需弹性”功能,支持按照不同时间段对资源进行动态的扩缩容,保证资源的利用率和应对资源洪峰等诉求。 弹性资源池对后端资源统一进行管理和调度,多个队列绑定弹性资源池后,资源池内资源共享,资源利用率高,解决了场景二的问题。
  • 什么是弹性资源池和队列? 在了解DLI计算资源模式前首先了解弹性资源池和队列的基本概念。 弹性资源池是DLI计算资源的一种池化管理模式,可以看做DLI计算资源的集合。DLI支持在弹性资源池中创建多个队列,且这些队列可以共享弹性资源池中的资源。 队列是DLI中被实际使用和分配的基本单元,即队列是执行作业所需的具体的计算资源。您可以为不同的作业或数据处理任务创建不同的队列,并按需对这些队列分配和调整资源。了解DLI的队列类型请参考DLI队列类型。
  • DLI计算资源模式 DLI提供了三种计算资源的管理模式,每一种模式都有独特的优势和适用场景。 图1 DLI计算资源模式 弹性资源池模式:计算资源的池化管理模式,提供计算资源的动态扩缩容能力,同一弹性资源池中的队列共享计算资源。通过合理设置队列的计算资源分配策略,可以提高计算资源利用率,应对业务高峰期的资源需求。 适用场景:适合业务量有明显波动的场合,如周期性的数据批处理任务或实时数据处理需求。 支持的队列类型:SQL队列(Spark)、SQL队列(HetuEngine)、通用队列。了解DLI的队列类型请参考DLI队列类型。 弹性资源池模式的通用队列和SQL队列不支持跨可用区。 使用方法:先创建弹性资源池,然后在弹性资源池中创建队列并分配计算资源,队列关联到具体的作业和数据处理任务。 购买弹性资源池并在弹性资源池中添加队列的具体操作步骤请参考创建弹性资源池并添加队列。 全局共享模式: 全局共享模式是一种根据SQL查询中实际扫描的数据量来分配计算资源的模式,不支持指定或预留计算资源。 DLI服务预置的“default”队列即为全局共享模式的计算资源,资源的大小是按需分配的。在不确定数据量大小或偶尔需要进行数据处理的用户,可以使用default队列执行作业。 适用场景:适用于测试作业或资源消耗不高的情况。 支持的队列类型:仅DLI预置的default队列为全局共享模式的计算资源。 “default”队列只用于用户体验DLI,是所有人共享的公共资源,使用时可能会出现用户间抢占资源的情况,不能保证每次都可以得到资源执行相关操作。建议使用自建队列执行生产作业。 使用方法:default队列仅适用于提交SQL作业,在DLI管理控制台提交SQL作业时选择"default队列"即可。 非弹性资源池模式(废弃,不推荐使用): DLI的上一代计算资源管理方式,因缺乏灵活性,目前已不推荐使用。 非弹性资源池模式提供固定规格的计算资源,购买后独占资源,无法根据需求动态调整,可能会导致资源浪费或在需求高峰期资源不足。 表1 DLI计算资源模式与支持的队列类型 DLI计算资源模式 支持的队列类型 资源特点 适用场景 弹性资源池模式 SQL队列(Spark) SQL队列(HetuEngine) 通用队列 单用户多队列共享资源 资源动态分配,灵活调整 适合业务需求波动较大,需要灵活调整资源以应对波峰波谷的业务场景。 全局共享模式 default队列 多用户多队列共享资源 按量付费,不支持预留资源 适合不确定数据量大小或仅需要偶尔进行数据处理的临时或测试项目场景。 非弹性资源池模式 (废弃,不推荐使用) SQL队列 通用队列 单用户单队列独享资源 无法动态调整,资源可能会闲置 废弃,不推荐使用
  • 数据库和表的约束与限制 数据库 “default”为内置数据库,不能创建名为“default”的数据库。 DLI支持创建的数据库的最大数量为50个。 数据表 DLI支持创建的表的最大数量为5000个。 DLI支持创建表类型: Managed:数据存储位置为DLI的表。 External:数据存储位置为OBS的表。 View:视图,视图只能通过SQL语句创建。 跨源表:表类型同样为External。 创建DLI表时不支持指定存储路径。 数据导入 仅支持将OBS上的数据导入DLI或OBS中。 支持将OBS中 CS V,Parquet,ORC,JSON和Avro格式的数据导入到在DLI中创建的表。 将CSV格式数据导入分区表,需在数据源中将分区列放在最后一列。 导入数据的编码格式仅支持UTF-8。 数据导出 只支持将DLI表(表类型为“Managed”)中的数据导出到OBS桶中,且导出的路径必须指定到文件夹级别。 导出文件格式为json格式,且文本格式仅支持UTF-8。 支持跨账号导出数据,即B账户对A账户授权后,A账户拥有B账户OBS桶的元数据信息和权限信息的读取权限,以及路径的读写权限,则A账户可将数据导出至B账户的OBS路径中。
  • DLI支持创建的表类型 DLI表 DLI表是存储在DLI 数据湖 中的数据表。支持多种数据格式,可以存储结构化、半结构化和非结构化数据。 DLI表的数据存储在DLI服务内部,查询性能更好,适用于对时延敏感类的业务,如交互类的查询等。 库表管理中表的列表页面,表类型为Managed的即代表DLI表。 OBS表 OBS表的数据存储在OBS上,适用于对时延不敏感的业务,如历史数据统计分析等。 OBS表通常以对象的形式存储数据,每个对象包含数据和相关的元数据。 库表管理中表的列表页面,表类型为External,存储位置为OBS路径的即代表OBS表。 视图表 视图表(View)是一种虚拟表,它不存储实际的数据,而是根据定义的查询逻辑动态生成数据。视图通常用于简化复杂的查询,或者为不同的用户或应用提供定制化的数据视图。 视图表可以基于一个或多个表创建,提供了一种灵活的方式来展示数据,而不影响底层数据的存储和组织。 库表管理中表的列表页面,表类型为View的即代表视图表。 View只能通过SQL语句进行创建,不能通过“创建表”页面进行创建。视图中包含的表或视图信息不可被更改,如有更改可能会造成查询失败。 跨源表 跨源表是指能够跨越多个数据源进行查询和分析的数据表。这种表可以整合来自不同数据源的数据,提供统一的数据视图。 跨源表常用于 数据仓库 和数据湖架构中,允许用户执行跨多个数据源的复杂查询。 库表管理中表的列表页面,表类型为External,存储位置非OBS路径的即代表跨源表。
  • 表 表是数据库最重要的组成部分之一,它由行和列组成。每一行代表一个数据项,每一列代表数据的一个属性或特征。表用于组织和存储特定类型的数据,使得数据可以被有效地查询和分析。 数据库是一个框架,表是其实质内容。一个数据库包含一个或者多个表。 用户可通过管理控制台或SQL语句创建数据库和表,其中SQL语句的操作方法请参见创建数据库、创建OBS表和创建DLI表等。本章节介绍在管理控制台创建数据库和表的操作步骤。 创建数据库和表时,有权限控制,需要对其他用户授权,其他用户才可查看该用户新建的数据库和表。
  • 创建 自定义镜像 以tensorflow为例,说明如何将tensorflow打包进镜像,生成安装了tensorflow的自定义镜像,在DLI作业中使用该镜像运行作业。 准备容器环境。 请参考安装容器引擎文档中的“安装容器引擎”章节。 使用root用户登录1容器镜像环境,执行以下命令获取DLI的基础镜像。 本示例使用Spark基础镜像为例,使用docker pull方式下载基础镜像到1中的容器镜像环境。 docker pull 基础镜像下载地址 基础镜像下载地址参考使用自定义镜像增强作业运行环境。 示例,Spark基础镜像下载: docker pull swr.xxx/dli-public/spark_general-x86_64:3.3.1-2.3.7.1720240419835647952528832.202404250955 连接 容器镜像服务 。 登录SWR管理控制台。 选择左侧导航栏的“总览”,单击页面右上角的“登录指令”,在弹出的页面中单击复制登录指令。 在安装容器引擎的虚拟机中执行上一步复制的登录指令。 创建容器镜像组织。如果已创建组织则本步骤可以忽略。 登录SWR管理控制台。 选择左侧导航栏的“组织管理”,单击页面右上角的“创建组织”。 填写组织名称,单击“确定”。 编写Dockerfile文件。 vi Dockerfile 具体内容参考如下,将tensorflow打包进镜像: ARG BASE_IMG=swr.xxx/dli-public/spark_general-x86_64:3.3.1-2.3.7.1720240419835647952528832.202404250955//请替换基础镜像的URL FROM ${BASE_IMG} as builder USER omm //注意要使用omm用户执行。 RUN set -ex && \ mkdir -p /home/omm/.pip && \ pip3 install tensorflow==1.13.1 \ --user --no-cache-dir --trusted-host pypi.cloudartifact.dgg.dragon.tools.huawei.com \ -i https://pypi.cloudartifact.dgg.dragon.tools.huawei.com/artifactory/api/pypi/cbu-pypi-public/simple 内容复制到基础镜像中 USER omm 其中,主要包含了以下步骤: 设置pip的可用仓库地址。 使用pip3安装tensorflow算法包。 将安装了算法包的临时镜像builder里的内容复制到基础镜像中(这一步主要是为了减小镜像体积),用于生成最终的自定义镜像。 利用Dockerfile生成自定义镜像。 镜像打包命令格式: docker build -t [自定义组织名称]/[自定义镜像名称]:[自定义镜像版本] --build-arg BASE_IMG=[DLI基础镜像地址] -f Dockerfile . DLI基础镜像地址为获取DLI基础镜像中的镜像地址。 示例: docker build -t mydli/spark:2.4 --build-arg BASE_IMG=swr.xxx/dli-public/spark_general-x86_64:3.3.1-2.3.7.1720240419835647952528832.202404250955 -f Dockerfile . 给自定义镜像打标签。 docker tag 6中的[自定义组织名称]/[自定义镜像名称]:[自定义镜像版本] [镜像仓库地址]/[组织名称]/[自定义镜像名称:自定义版本名称] 示例: docker tag mydli/spark:2.4 swr.xxx/testdli0617/spark:2.4.5.tensorflow 上传自定义镜像。 docker push [镜像仓库地址]/[组织名称]/[自定义镜像名称:自定义版本名称] 上述命令中的“[镜像仓库地址]/[组织名称]/[自定义镜像名称:自定义版本名称]”保持和7一致。 示例: docker push swr.xxx/testdli0617/spark:2.4.5.tensorflow 在DLI服务中提交Spark或者Flink jar作业时选择自定义镜像。 打开管理控制台的Spark作业或者Flink作业编辑页面,在自定义镜像列表中选择已上传并共享的镜像,运行作业,即可使用自定义镜像运行作业。 如果选择的镜像不是共享镜像,自定义镜像处会提示该镜像未授权,则需要授权后才可以使用。具体可以参考图3,提示处单击“立即授权”即可,填写其他作业执行参数后,再执行作业。 图2 在DLI Spark作业编辑页面,选择自定义镜像 图3 Spark作业镜像授权操作 图4 在DLI Flink jar作业编辑页面,选择自定义镜像 在使用API时,在作业参数中指定image参数,即可使用自定义镜像运行作业。Spark作业请参考《创建批处理作业》,Flink jar作业请参考《创建Flink Jar作业》。
  • 获取DLI基础镜像 表1 获取DLI基础镜像 镜像类型 架构 URL general镜像 X86 swr.cn-north-4.myhuaweicloud.com/dli-public/spark_general-x86_64:3.3.1-2.3.7.1720240419835647952528832.202404250955 general镜像 ARM swr.cn-north-4.myhuaweicloud.com/dli-public/spark_general-aarch64:3.3.1-2.3.7.1720240419835647952528832.202404250955 notebook镜像 X86 swr.cn-north-4.myhuaweicloud.com/dli-public/spark_notebook-x86_64:3.3.1-2.3.7.1720240419835647952528832.202404250955 notebook镜像 ARM swr.cn-north-4.myhuaweicloud.com/dli-public/spark_notebook-aarch64:3.3.1-2.3.7.1720240419835647952528832.202404250955
  • 弹性资源池约束与限制 不支持切换弹性资源池的计费模式。 弹性资源池不支持切换区域。 按需计费的弹性资源池默认勾选专属资源模式,自创建起按自然小时收费。 Flink 1.10及其以上版本的作业支持在弹性资源池运行。 弹性资源池网段设置后不支持更改。 弹性资源池关联队列: 仅支持关联按需计费模式的队列(包括专属队列)。 队列和弹性资源池状态正常,资源未被冻结。 当前仅支持包年包月计费模式的弹性资源池进行规格变更。 仅支持查看30天以内的弹性资源池扩缩容历史。 弹性资源池不支持访问公网。 弹性资源池CU设置、弹性资源池中添加/删除队列、修改弹性资源池中队列的扩缩容策略、系统自动触发弹性资源池扩缩容时都会引起弹性资源池CU的变化,部分情况下系统无法保证按计划扩容/缩容至目标CUs: 弹性资源池扩容时,可能会由于物理资源不足导致弹性资源池无法扩容到设定的目标大小。 弹性资源池缩容时,系统不保证将队列资源完全缩容到设定的目标大小。 在执行缩容任务时,系统会先检查资源使用情况,判断是否存在缩容空间,如果现有资源无法按照最小缩容步长执行缩容任务,则弹性资源池可能缩容不成功,或缩容一部分规格的情况。 因资源规格不同可能有不同的缩容步长,通常是16CUs、32CUs、48CUs、64CUs等。 示例:弹性资源池规格为192CUs,资源池中的队列执行作业占用了68CUs,计划缩容至64CUs。 执行缩容任务时,系统判断剩余124CUs,按64CUs的缩容步长执行缩容任务,剩余60CUs资源无法继续缩容,因此弹性资源池执行缩容任务后规格为128CUs。
  • 在弹性资源池中添加队列 创建完弹性资源池后,弹性资源池需要添加一个或多个队列用于后续作业的运行。本节操作介绍在弹性资源池中添加队列的操作步骤。 添加到弹性资源池中的队列不再单独计费,以弹性资源池为计费项计费。 当弹性资源池添加队列时会引起弹性资源CUs扩缩容变化。 在左侧导航栏单击“弹性资源池”,可进入弹性资源池管理页面。 选择要操作的弹性资源池,在“操作”列,单击“添加队列”。 在“添加队列”界面,首先需要配置队列的基础配置,具体参数信息如下。 表2 弹性资源池添加队列基础配置 参数名 参数描述 名称 弹性资源池添加的队列名称。 类型 SQL队列:用于运行SQL作业。 通用队列:用于运行Spark作业 、Flink 作业。 执行引擎 如果队列类型选择为“SQL队列”,则可以选择队列引擎是:Spark或者HetuEngine HetuEngine类型的SQL队列最小CU不能小于96CUs。 企业项目 选择队列的企业项目。弹性资源池支持添加不同企业项目的队列资源。 企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。 关于如何设置企业项目请参考《企业管理用户指南》。 说明: 只有开通了企业管理服务的用户才显示该参数。 描述 弹性资源池添加队列的描述信息。 标签 使用标签标识云资源。包括标签键和标签值。如果您需要使用同一标签标识多种云资源,即所有服务均可在标签输入框下拉选择同一标签,建议在标签管理服务(TMS)中创建预定义标签。 如您的组织已经设定DLI的相关标签策略,则需按照标签策略规则为资源添加标签。标签如果不符合标签策略的规则,则可能会导致资源创建失败,请联系组织管理员了解标签策略详情。 具体请参考《标签管理服务用户指南》。 说明: 最多支持20个标签。 一个“键”只能添加一个“值”。 每个资源中的键名不能重复。 标签键:在输入框中输入标签键名称。 说明: 标签的键的最大长度为128个字符,标签的键可以包含任意语种字母、数字、空格和_ . : =+-@ ,但首尾不能含有空格,不能以_sys_开头。 标签值:在输入框中输入标签值。 说明: 标签值的最大长度为255个字符,标签的值可以包含任意语种字母、数字、空格和_ . : =+-@ ,但首尾不能含有空格。 单击“下一步”,在“扩缩容策略”界面配置当前队列在弹性资源池的扩缩容策略。 图1 添加队列时配置扩缩容策略 单击“新增”,可以添加不同优先级、时间段、“最小CU”和“最大CU”扩缩容策略。每条扩缩容策略的参数说明如下: 表3 扩缩容策略参数说明 参数名 参数描述 优先级 当前弹性资源池中的优先级数字越大表示优先级越高。当前优先级支持的范围为:1到100。 时间段 时间段设置仅支持整点,左侧为开始时间,右侧为结束时间。请注意以下说明: 时间区间包括开始时间,不包括结束时间即[开始时间, 结束时间)。 例如当前选择的时间段范围为:01--17,则表示当前扩缩容规则生效时间范围为[01,17)。 同一队列不同优先级的时间段区间不能有交集。 最小CU 当前扩缩容策略支持的最小CU数。 在全天的任意一个时间段内,弹性资源池中所有队列的最小CU数之和必须小于等于弹性资源池的最小CU数。 当队列的最小CUs小于16CUs时,在队列属性中设置的“最大spark driver实例数”和“最大预拉起spark driver实例数”不生效。了解队列属性设置。 HetuEngine类型的SQL队列最小CU不能小于96CUs。 最大CU 当前扩缩容策略支持的最大CU数。 在全天的任意一个时间段内,弹性资源池中任意一个队列的最大CU必须小于等于弹性资源池的最大CU。 首条扩缩容策略是默认策略,不能删除和修改时间段配置。 Flink作业不支持触发弹性资源池队列的自动扩缩容。 单击“确定”完成添加队列配置。弹性资源池队列添加完成后,可以参考调整弹性资源池中队列的扩缩容策略查看弹性资源池添加的所有队列配置和策略信息。
  • 步骤5:在DLI作业开发时使用LakeFormation元数据 DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。 DLI SQL: LakeFormation SQL语法说明请参考DLI Spark SQL语法参考。 在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。 图2 在SQL编辑器页面选择数据目录 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。 对接LakeFormation实例场景,不支持指定筛选条件删除分区。 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。 DLI暂不支持使用LakeFormation行过滤条件功能。 DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。 在DLI暂不支持LakeFormation的路径授权。 DLI Spark Jar: 本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。 Spark Jar 示例 SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("java_spark_demo") .getOrCreate(); spark.sql("show databases").show(); DLI管理控制台Spark Jar作业配置说明 (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。 表3 配置Spark Jar作业访问LakeFormation元数据 参数 说明 配置示例 Spark版本 Spark 3.3.x及以上版本支持对接LakeFormation。 3.3.1 委托 使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在 IAM 页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: spark.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 访问元数据 配置开启Spark作业访问元数据功能。 是 元数据来源 配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。 选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient og // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* “元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“元数据来源”参数项进行配置。 Lakeformation 数据目录名称 配置Spark作业访问的数据目录名称。 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。 选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。 spark.hadoop.lakecat.catalogname.default=lfcatalog “数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。 - Spark参数(--conf) “元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider 如果您需要配置访问Delta数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension - 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选 spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选 // 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集 spark.dli.job.agency.name=agencyForLakeformation //需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例 spark.hadoop.lakeformation.instance.id=xxx //需要访问的lakeformation侧的CATA LOG 名称,在lakeformation console查看。可选,如不填写则默认值为hive spark.hadoop.lakecat.catalogname.default=lfcatalog // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* DLI Flink OpenSource SQL 示例1:委托的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表4 表4 Flink OpenSource SQL示例中关于Catalog的参数说明 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 默认default库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', -- 固定配置/opt/flink/conf 'default-database'='default' ); USE CATALOG hive; CREATE TABLE IF NOT EXISTS dataGenSource612 (user_id string, amount int) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); CREATE table IF NOT EXISTS printSink612 (user_id string, amount int) WITH ('connector' = 'print'); INSERT INTO printSink612 SELECT * FROM dataGenSource612; 示例2:DEW的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表5 需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。 表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式) 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 不填默认default库 properties.catalog.lakecat.auth.identity.util.class 认证信息获取类 是 dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator properties.catalog.dew.projectId DEW所在的项目ID, 默认是Flink作业所在的项目ID。 是 使用dew方式必填 properties.catalog.dew.endpoint 指定要使用的DEW服务所在的endpoint信息。 是 使用dew方式必填。 配置示例:kms.xxx.com properties.catalog.dew.csms.secretName 在DEW服务的凭据管理中新建的通用凭据的名称。 是 使用dew方式必填 properties.catalog.dew.csms.version 在DEW服务的凭据管理中新建的通用凭据的版本号。 是 使用dew方式必填 properties.catalog.dew.access.key 在DEW服务的凭据中配置access.key值对应的key 是 使用dew方式必填 properties.catalog.dew.secret.key 在DEW服务的凭据中配置secret.key值对应的key 是 使用dew方式必填 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='default', --下边是dew相关配置,请根据实际情况修改参数值 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator', 'properties.catalog.dew.endpoint'='kms.xxx.com', 'properties.catalog.dew.csms.secretName'='obsAksK', 'properties.catalog.dew.access.key' = 'myak', 'properties.catalog.dew.secret.key' = 'mysk', 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx', 'properties.catalog.dew.csms.version'='v9' ); USE CATALOG myhive; create table IF NOT EXISTS dataGenSource_dew612( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); create table IF NOT EXISTS printSink_dew612( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink_dew612 select * from dataGenSource_dew612; 示例3:委托的方式对接Lakeformation写hudi表 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表6。 表6 hudi类型Catalog参数说明 参数 说明 是否必填 参数值 type catalog类型 是 hudi表配置为hudi。 hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf。 default-database 默认数据库名称 否 默认default库。 mode 取值'hms' 或 'non-hms'。 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。 'non-hms'表示不使用Hive Metastore存储元数据信息。 是 固定值hms。 表7 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true CREATE CATALOG hive_catalog WITH ( 'type'='hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='test' ); USE CATALOG hive_catalog; create table if not exists genSource618 ( order_id STRING, order_name STRING, price INT, weight INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_id.kind' = 'random', 'fields.order_id.length' = '8', 'fields.order_name.kind' = 'random', 'fields.order_name.length' = '5' ); CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'hive.conf.dir' = '/opt/flink/conf', 'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence ); CREATE TABLE if not exists hoodie_catalog.`test`.`hudiSink618` ( `order_id` STRING PRIMARY KEY NOT ENFORCED, `order_name` STRING, `price` INT, `weight` INT, `create_time` BIGINT, `create_date` String ) PARTITIONED BY (create_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://xxx/catalog/dbtest3/hudiSink618', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'create_time', 'EXTERNAL' = 'true' -- must be set ); insert into hoodie_catalog.`test`.`hudiSink618` select order_id, order_name, price, weight, UNIX_TIMESTAMP() as create_time, FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date from genSource618; DLI Flink Jar 示例1:委托方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskAgency { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; /*testdb是用户自定义的数数据库*/ tEnv.executeSql(dataSource); String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJar618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例2:DEW方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskDew { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf',\n" + " 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" + " 'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" + " 'properties.catalog.dew.csms.secretName'='obsAksK',\n" + " 'properties.catalog.dew.access.key' = 'ak',\n" + " 'properties.catalog.dew.secret.key' = 'sk',\n" + " 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" + " 'properties.catalog.dew.csms.version'='v9'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; tEnv.executeSql(dataSource); /*testdb是用户自定义的数数据库*/ String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例3:Flink jar支持Hudi表 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Hudi结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; public class GenToHudiTask4 { private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) throws IOException { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "30000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String catalog = "CREATE CATALOG hoodie_catalog\n" + " WITH (\n" + " 'type'='hudi',\n" + " 'hive.conf.dir' = '/opt/hadoop/conf',\n" + " 'mode'='hms'\n" + " )"; tEnv.executeSql(catalog); String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" + " order_id STRING,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.kind' = 'random',\n" + " 'fields.order_id.length' = '8',\n" + " 'fields.order_name.kind' = 'random',\n" + " 'fields.order_name.length' = '8'\n" + ")"; tEnv.executeSql(dwsSource); /*testdb是用户自定义的数数据库*/ String printSinkdws = "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" + " order_id STRING PRIMARY KEY NOT ENFORCED,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT,\n" + " create_time BIGINT,\n" + " create_date String\n" + ") WITH (" + "'connector' = 'hudi',\n" + "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" + "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" + "'EXTERNAL' = 'true'\n" + ")"; tEnv.executeSql(printSinkdws); /*testdb是用户自定义的数数据库*/ String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" + " order_id,\n" + " order_name,\n" + " price,\n" + " weight,\n" + " UNIX_TIMESTAMP() as create_time,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" + " from genSourceJarForHudi618_1"; tEnv.executeSql(query); } } 表8 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 -
  • 步骤3:在DLI管理控制台创建数据目录 在DLI管理控制台需要创建到Catalog的连接,才可以访问LakeFormation实例中存储的Catalog。 DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。 LakeFormation中每一个数据目录只能创建一个映射,不能创建多个。 例如用户在DLI创建了映射名catalogMapping1对应LakeFormation数据目录:catalogA。创建成功后,在同一个项目空间下,不能再创建到catalogA的映射。 登录DLI管理控制台。 选择“SQL编辑器 ”。 在SQL编辑器页面,选择“数据目录”。 单击创建数据目录。 配置数据目录相关信息。 表2 数据目录配置信息 参数名称 是否必填 说明 外部数据目录名称 是 LakeFormation默认实例下的Catalog名称。 类型 是 当前只支持LakeFormation。 该选项已固定,无需填写。 数据目录映射名称 是 在DLI使用的Catalog映射名,用户在执行SQL语句的时候需要指定Catalog映射,以此来标识访问的外部的元数据。建议与外部数据目录名称保持一致。 当前仅支持连接LakeFormation默认实例的数据目录。 描述 否 自定义数据目录的描述信息。 单击“确定”创建数据目录。
  • 步骤1:创建LakeFormation实例用于元数据存储 LakeFormation实例为元数据的管理提供基础资源,DLI仅支持对接LakeFormation的默认实例。 创建实例 登录LakeFormation管理控制台。 单击页面右上角“立即购买”或“购买实例”,进入实例购买页面。 首次创建实例时界面显示“立即购买”,如果界面已有LakeFormation实例则显示为“购买实例”。 按需配置LakeFormation实例参数,完成实例创建。 本例创建按需计费的共享型实例。 更多参数配置及说明,请参考创建LakeFormation实例。 设置实例为默认实例 查看实例“基本信息”中“是否为默认实例”的参数值。 “true”表示当前实例为默认实例。 “false”表示当前实例不为默认实例。 如果需要设置当前实例为默认实例,请单击页面右上角“设为默认实例”。 勾选操作影响后单击“确定”,将当前实例设置为默认实例。 当前DLI仅对接LakeFormation默认实例,变更默认实例后,可能对使用LakeFormation的周边服务产生影响,请谨慎操作。
  • 约束限制 在表1中提供了支持对接LakeFormation获取元数据的队列和引擎类型。 查看队列的引擎类型和版本请参考查看队列的基本信息。 表1 LakeFormation获取元数据的队列和引擎类型 队列类型 引擎类型和支持的版本 default队列 Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。 HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。 SQL队列 Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。 HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。 通用队列 Flink作业场景:Flink 1.15及以上版本且使用弹性资源池队列时支持对接LakeFormation获取元数据。 DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。 DLI支持读取Lakeformation的中Avro、Json、Parquet、Csv、Orc、Text、Hudi格式的数据。 LakeFormation数据目录中的库、表权限统一由LakeFormation管理。 DLI支持对接LakeFormation后,DLI原始库表下移至dli的数据目录下。