云服务器内容精选
-
步骤5:在 DLI 作业开发时使用LakeFormation元数据 DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。 DLI SQL: LakeFormation SQL语法说明请参考DLI Spark SQL语法参考。 在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。 图2 在SQL编辑器页面选择数据目录 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。 对接LakeFormation实例场景,不支持指定筛选条件删除分区。 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。 DLI暂不支持使用LakeFormation行过滤条件功能。 DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。 在DLI暂不支持LakeFormation的路径授权。 DLI Spark Jar: 本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。 Spark Jar 示例 SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("java_spark_demo") .getOrCreate(); spark.sql("show databases").show(); DLI管理控制台Spark Jar作业配置说明 (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。 表3 配置Spark Jar作业访问LakeFormation元数据 参数 说明 配置示例 Spark版本 Spark 3.3.x及以上版本支持对接LakeFormation。 3.3.1 委托 使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在 IAM 页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: spark.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 访问元数据 配置开启Spark作业访问元数据功能。 是 元数据来源 配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。 选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient og // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* “元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“元数据来源”参数项进行配置。 Lakeformation 数据目录名称 配置Spark作业访问的数据目录名称。 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。 选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。 spark.hadoop.lakecat.catalogname.default=lfcatalog “数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。 - Spark(--conf) 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider “元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 - 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选 spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选 // 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集 spark.dli.job.agency.name=agencyForLakeformation //需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例 spark.hadoop.lakeformation.instance.id=xxx //需要访问的lakeformation侧的CATA LOG 名称,在lakeformation console查看。可选,如不填写则默认值为hive spark.hadoop.lakecat.catalogname.default=lfcatalog // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* DLI Flink OpenSource SQL 示例1:委托的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表4 表4 Flink OpenSource SQL示例中关于Catalog的参数说明 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 默认default库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', -- 固定配置/opt/flink/conf 'default-database'='default' ); USE CATALOG hive; CREATE TABLE IF NOT EXISTS dataGenSource612 (user_id string, amount int) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); CREATE table IF NOT EXISTS printSink612 (user_id string, amount int) WITH ('connector' = 'print'); INSERT INTO printSink612 SELECT * FROM dataGenSource612; 示例2:DEW的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表5 需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。 表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式) 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 不填默认default库 properties.catalog.lakecat.auth.identity.util.class 认证信息获取类 是 dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator properties.catalog.dew.projectId DEW所在的项目ID, 默认是Flink作业所在的项目ID。 是 使用dew方式必填 properties.catalog.dew.endpoint 指定要使用的DEW服务所在的endpoint信息。 是 使用dew方式必填。 配置示例:kms.xxx.com properties.catalog.dew.csms.secretName 在DEW服务的凭据管理中新建的通用凭据的名称。 是 使用dew方式必填 properties.catalog.dew.csms.version 在DEW服务的凭据管理中新建的通用凭据的版本号。 是 使用dew方式必填 properties.catalog.dew.access.key 在DEW服务的凭据中配置access.key值对应的key 是 使用dew方式必填 properties.catalog.dew.secret.key 在DEW服务的凭据中配置secret.key值对应的key 是 使用dew方式必填 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='default', --下边是dew相关配置,请根据实际情况修改参数值 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator', 'properties.catalog.dew.endpoint'='kms.xxx.com', 'properties.catalog.dew.csms.secretName'='obsAksK', 'properties.catalog.dew.access.key' = 'myak', 'properties.catalog.dew.secret.key' = 'mysk', 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx', 'properties.catalog.dew.csms.version'='v9' ); USE CATALOG myhive; create table IF NOT EXISTS dataGenSource_dew612( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); create table IF NOT EXISTS printSink_dew612( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink_dew612 select * from dataGenSource_dew612; 示例3:委托的方式对接Lakeformation写hudi表 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表6。 表6 hudi类型Catalog参数说明 参数 说明 是否必填 参数值 type catalog类型 是 hudi表配置为hudi。 hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf。 default-database 默认数据库名称 否 默认default库。 mode 取值'hms' 或 'non-hms'。 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。 'non-hms'表示不使用Hive Metastore存储元数据信息。 是 固定值hms。 表7 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true CREATE CATALOG hive_catalog WITH ( 'type'='hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='test' ); USE CATALOG hive_catalog; create table if not exists genSource618 ( order_id STRING, order_name STRING, price INT, weight INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_id.kind' = 'random', 'fields.order_id.length' = '8', 'fields.order_name.kind' = 'random', 'fields.order_name.length' = '5' ); CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'hive.conf.dir' = '/opt/flink/conf', 'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence ); CREATE TABLE if not exists hoodie_catalog.`test`.`hudiSink618` ( `order_id` STRING PRIMARY KEY NOT ENFORCED, `order_name` STRING, `price` INT, `weight` INT, `create_time` BIGINT, `create_date` String ) PARTITIONED BY (create_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://xxx/catalog/dbtest3/hudiSink618', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'create_time', 'EXTERNAL' = 'true' -- must be set ); insert into hoodie_catalog.`test`.`hudiSink618` select order_id, order_name, price, weight, UNIX_TIMESTAMP() as create_time, FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date from genSource618; DLI Flink Jar 示例1:委托方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskAgency { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; /*testdb是用户自定义的数数据库*/ tEnv.executeSql(dataSource); String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJar618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例2:DEW方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskDew { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf',\n" + " 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" + " 'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" + " 'properties.catalog.dew.csms.secretName'='obsAksK',\n" + " 'properties.catalog.dew.access.key' = 'ak',\n" + " 'properties.catalog.dew.secret.key' = 'sk',\n" + " 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" + " 'properties.catalog.dew.csms.version'='v9'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; tEnv.executeSql(dataSource); /*testdb是用户自定义的数数据库*/ String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例3:Flink jar支持Hudi表 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Hudi结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; public class GenToHudiTask4 { private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) throws IOException { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "30000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String catalog = "CREATE CATALOG hoodie_catalog\n" + " WITH (\n" + " 'type'='hudi',\n" + " 'hive.conf.dir' = '/opt/hadoop/conf',\n" + " 'mode'='hms'\n" + " )"; tEnv.executeSql(catalog); String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" + " order_id STRING,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.kind' = 'random',\n" + " 'fields.order_id.length' = '8',\n" + " 'fields.order_name.kind' = 'random',\n" + " 'fields.order_name.length' = '8'\n" + ")"; tEnv.executeSql(dwsSource); /*testdb是用户自定义的数数据库*/ String printSinkdws = "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" + " order_id STRING PRIMARY KEY NOT ENFORCED,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT,\n" + " create_time BIGINT,\n" + " create_date String\n" + ") WITH (" + "'connector' = 'hudi',\n" + "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" + "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" + "'EXTERNAL' = 'true'\n" + ")"; tEnv.executeSql(printSinkdws); /*testdb是用户自定义的数数据库*/ String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" + " order_id,\n" + " order_name,\n" + " price,\n" + " weight,\n" + " UNIX_TIMESTAMP() as create_time,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" + " from genSourceJarForHudi618_1"; tEnv.executeSql(query); } } 表8 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 -
-
步骤4:授权使用LakeFormation资源 SQL作业场景 在进行SQL作业提交之前,需完成LakeFormation元数据、数据库、表、列和函数等资源授权,确保作业在执行过程中能够顺利访问所需的数据和资源。LakeFormation SQL资源权限支持列表提供了LakeFormation权限支持列表。 使用LakeFormation资源需要分别完成LakeFormation的IAM细粒度授权和LakeFormation SQL资源授权。 LakeFormation的IAM细粒度授权:授权使用LakeFormation API。 IAM服务通常提供了管理用户、组和角色的访问权限的方式。您可以在IAM控制台中创建策略(Policy),定义哪些用户或角色可以调用LakeFormation的API。然后,将这些策略附加到相应的用户或角色上。 方法1:基于角色授权: 即IAM最初提供的一种根据用户的工作职能定义权限的粗粒度授权机制。该机制以服务为粒度,提供有限的服务相关角色用于授权。 例如参考LakeFormation权限管理授予用户只读权限,允许查询LakeFormation相关元数据资源的权限。 或如下示例授予LakeFormation相关元数据资源的所有操作权限。 示例: { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "lakeformation:table:*", "lakeformation:database:*", "lakeformation:catalog:*", "lakeformation:function:*", "lakeformation:transaction:*", "lakeformation:policy:describe", "lakeformation:credential:describe" ] } ] } 方法2:基于策略的精细化授权: IAM提供的细粒度授权的能力,可以精确到具体服务的操作、资源以及请求条件等。 LakeFormation权限策略请参考LakeFormation权限和授权项。 IAM授权的具体操作请参考创建用户并授权使用LakeFormation。 LakeFormation SQL资源授权:授权使用LakeFormation具体资源(元数据、数据库、表、列和函数等)。 LakeFormation资源授权是指允许用户对特定资源的访问的权限,以此来控制对LakeFormation的数据和元数据的访问。 LakeFormation资源授权有两种方式: 方式一:在LakeFormation管理控制台对资源授权。 具体操作请参考LakeFormation用户指南中的新增授权。 了解LakeFormation SQL资源权限请参考数据权限概述。 方式二:在DLI管理控制台使用GRANT SQL语句授权 GRANT语句是SQL语言中用于授权的一种方式。 您可以使用GRANT语句来授予用户或角色对数据库、表、列、函数等的访问权限。 LakeFormation SQL资源权限支持列表提供了LakeFormation资源授权的策略。 Catalog资源暂时不支持在DLI SQL授权,请参考▪方式一:在LakeFormation管理控制台...在LakeFormation 管理控制台完成授权。 Spark Jar、Flink OpenSource SQL、Flink Jar作业场景: 方式1:使用委托授权:使用Spark 3.3.1及以上版本、Flink 1.15版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在配置作业时添加新建的委托信息。 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 方式2:使用DEW授权: 已为授予IAM用户所需的IAM和Lakeformation权限,具体请参考•SQL作业场景的IAM授权的操作步骤。 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考创建通用凭据。 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限: DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。 DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。 DEW解密凭据的权限,kms:dek:decrypt。 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
步骤3:在DLI管理控制台创建数据目录 在DLI管理控制台需要创建到Catalog的连接,才可以访问LakeFormation实例中存储的Catalog。 DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。 LakeFormation中每一个数据目录只能创建一个映射,不能创建多个。 例如用户在DLI创建了映射名catalogMapping1对应LakeFormation数据目录:catalogA。创建成功后,在同一个项目空间下,不能再创建到catalogA的映射。 登录DLI管理控制台。 选择“SQL编辑器 ”。 在SQL编辑器页面,选择“数据目录”。 单击创建数据目录。 配置数据目录相关信息。 表2 数据目录配置信息 参数名称 是否必填 说明 外部数据目录名称 是 LakeFormation默认实例下的Catalog名称。 类型 是 当前只支持LakeFormation。 该选项已固定,无需填写。 数据目录映射名称 是 在DLI使用的Catalog映射名,用户在执行SQL语句的时候需要指定Catalog映射,以此来标识访问的外部的元数据。建议与外部数据目录名称保持一致。 当前仅支持连接LakeFormation默认实例的数据目录。 描述 否 自定义数据目录的描述信息。 单击“确定”创建数据目录。
-
步骤1:创建LakeFormation实例用于元数据存储 LakeFormation实例为元数据的管理提供基础资源,DLI仅支持对接LakeFormation的默认实例。 创建实例 登录LakeFormation管理控制台。 单击页面右上角“立即购买”或“购买实例”,进入实例购买页面。 首次创建实例时界面显示“立即购买”,如果界面已有LakeFormation实例则显示为“购买实例”。 按需配置LakeFormation实例参数,完成实例创建。 本例创建按需计费的共享型实例。 更多参数配置及说明,请参考创建LakeFormation实例。 设置实例为默认实例 查看实例“基本信息”中“是否为默认实例”的参数值。 “true”表示当前实例为默认实例。 “false”表示当前实例不为默认实例。 如果需要设置当前实例为默认实例,请单击页面右上角“设为默认实例”。 勾选操作影响后单击“确定”,将当前实例设置为默认实例。 当前DLI仅对接LakeFormation默认实例,变更默认实例后,可能对使用LakeFormation的周边服务产生影响,请谨慎操作。
-
约束限制 在表1中提供了支持对接LakeFormation获取元数据的队列和引擎类型。 查看队列的引擎类型和版本请参考查看队列的基本信息。 表1 LakeFormation获取元数据的队列和引擎类型 队列类型 引擎类型和支持的版本 default队列 Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。 HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。 SQL队列 Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。 HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。 通用队列 Flink作业场景:Flink 1.15及以上版本且使用弹性资源池队列时支持对接LakeFormation获取元数据。 DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。 DLI支持读取Lakeformation的中Avro、Json、Parquet、Csv、Orc、Text、Hudi格式的数据。 LakeFormation数据目录中的库、表权限统一由LakeFormation管理。 DLI支持对接LakeFormation后,DLI原始库表下移至dli的数据目录下。
-
Lakeformation权限策略 表2 Lakeformation权限策略 类型 SQL语句 元数据IAM鉴权权限 SQL资源鉴权权限 DDL语句 ALTER DATABASE database:describe database:alter database:DESCRIBE database:ALTER ALTER TABLE database:describe table:describe table:alter database:create database:DESCRIBE table:DESCRIBE table:ALTER database:CREATE_TABLE column:SELECT或table:SELECT ALTER VIEW database:describe table:describe table:alter database:DESCRIBE table:DESCRIBE column:SELECT table:ALTER CREATE DATABASE database:describe database:create database:DESCRIBE catalog:CREATE_DATABASE CREATE OR REPLACE FUNCTION (CREATE) database:describe function:create database:DESCRIBE database:CREATE_FUNC CREATE OR REPLACE FUNCTION (REPLACE) database:describe function:describe function:alter database:CREATE_FUNC database:DESCRIBE function:DESCRIBE function:ALTER CREATE TABLE database:describe table:describe table:create database:DESCRIBE database:CREATE_TABLE CREATE VIEW database:describe table:describe table:drop table:create database:CREATE_TABLE table:DESCRIBE(source\targer) table:DROP(target) column:SELECT DROP DATABASE database:describe database:drop database:DESCRIBE database:DROP DROP FUNCTION database:describe function:describe function:drop database:DESCRIBE function:DESCRIBE function:DROP DROP TABLE database:describe table:describe credential:describe table:drop database:DESCRIBE table:DESCRIBE table:DROP DROP VIEW database:describe table:describe table:drop database:DESCRIBE table:DESCRIBE(target\source) table:DROP(target) REPAIR TABLE database:describe table:describe credential:describe table:alter database:DESCRIBE table:DESCRIBE table:ALTER table:SELECT TRUNCATE TABLE database:describe table:describe table:alter database:DESCRIBE table:DESCRIBE table:SELECT table:UPDATE DML语句 INSERT TABLE database:describe table:describe table:alter credential:describe database:DESCRIBE table:DESCRIBE table:ALTER table:INSERT column:SELECT或table:SELECT LOAD DATA database:describe table:describe credential:describe database:DESCRIBE table:DESCRIBE table:UPDATE table:ALTER table:SELECT DR语句 SELECT database:describe table:describe credential:describe database:DESCRIBE table:DESCRIBE column:SELECT EXPLAIN 取决于执行sql 取决于执行sql Auxiliary 语句 ANALYZE TABLE database:describe table:describe credential:describe table:alter database:DESCRIBE table:DESCRIBE table:SELECT table:ALTER DESCRIBE DATABASE database:describe database:DESCRIBE DESCRIBE FUNCTION database:describe function:describe database:DESCRIBE function:DESCRIBE DESCRIBE QUERY database:describe table:describe database:DESCRIBE table:DESCRIBE table:SELECT DESCRIBE TABLE database:describe table:describe database:DESCRIBE table:DESCRIBE REFRESH TABLE database:describe table:describe credential:describe database:DESCRIBE table:DESCRIBE table:SELECT REFRESH FUNCTION database:describe function:describe database:DESCRIBE function:DESCRIBE SHOW COLUMNS database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW CREATE TABLE database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW DATABASES database:describe catalog:LIST_DATABASE database:DESCRIBE SHOW FUNCTIONS database:describe function:describe database:DESCRIBE SHOW PARTITIONS database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW TABLE EXTENDED database:describe table:describe catalog:LIST_DATABASE database:DESCRIBE table:DESCRIBE database:LIST_TABLE SHOW TABLES database:describe table:describe catalog:LIST_DATABASE database:LIST_TABLE database:DESCRIBE SHOW TBLPROPERTIES database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW VIEWS database:describe table:describe catalog:LIST_DATABASE database:LIST_TABLE database:DESCRIBE
-
LakeFormation SQL资源权限支持列表 DLI 支持SQL资源鉴权的操作列表请参考数据权限列表。 LakeFormation SQL资源权限支持列表请参考表1。 表1 LakeFormation SQL资源权限支持列表 资源类型 权限类型 Database ALL ALTER DROP DESCRIBE LIST_TABLE LIST_FUNC CREATE_TABLE CREATE_FUNC Table/View ALL ALTER DROP DESCRIBE UPDATE INSERT SELECT DELETE Column SELECT Function ALL ALTER DROP DESCRIBE EXEC
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格