数据湖探索 DLI-DLI对接LakeFormation:步骤5:在DLI作业开发时使用LakeFormation元数据
步骤5:在 DLI 作业开发时使用LakeFormation元数据
DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。
- DLI SQL:
LakeFormation SQL语法说明请参考DLI Spark SQL语法参考。
在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。
- 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。
- 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。
- 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。
- 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。
- 对接LakeFormation实例场景,不支持指定筛选条件删除分区。
- 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。
- DLI暂不支持使用LakeFormation行过滤条件功能。
- DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。
- 在DLI暂不支持LakeFormation的路径授权。
- DLI Spark Jar:
本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。
- Spark Jar 示例
SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("java_spark_demo") .getOrCreate();spark.sql("show databases").show();
- DLI管理控制台Spark Jar作业配置说明
- (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据
新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。
表3 配置Spark Jar作业访问LakeFormation元数据 参数
说明
配置示例
Spark版本
Spark 3.3.x及以上版本支持对接LakeFormation。
3.3.1
委托
使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在 IAM 页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:
spark.dli.job.agency.name=agency
委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
访问元数据
配置开启Spark作业访问元数据功能。
是
元数据来源
配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。
选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。
spark.sql.catalogImplementation=hivespark.hadoop.hive-ext.dlcatalog.metastore.client.enable=truespark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClientog// lakeformation相关依赖加载spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
“元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。
优先推荐您使用控制台提供的“元数据来源”参数项进行配置。
Lakeformation
数据目录名称
配置Spark作业访问的数据目录名称。
此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。
选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。
spark.hadoop.lakecat.catalogname.default=lfcatalog
“数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。
优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。
-
Spark参数(--conf)
“元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。
- 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtensionspark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider
- 如果您需要配置访问Delta数据表,可在Spark(--conf)参数中填加以下配置项。
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
-
- 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。
- 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据
新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。
spark.sql.catalogImplementation=hivespark.hadoop.hive-ext.dlcatalog.metastore.client.enable=truespark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClientspark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选// 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集spark.dli.job.agency.name=agencyForLakeformation//需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例spark.hadoop.lakeformation.instance.id=xxx//需要访问的lakeformation侧的CATA LOG 名称,在lakeformation console查看。可选,如不填写则默认值为hivespark.hadoop.lakecat.catalogname.default=lfcatalog// lakeformation相关依赖加载spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
- (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据
- Spark Jar 示例
- DLI Flink OpenSource SQL
- 示例1:委托的方式对接Lakeformation
创建Flink OpenSource SQL作业并配置如下参数:
参数
说明
配置示例
Flink版本
Flink 1.15及以上版本支持对接LakeFormation。
1.15
委托
使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:
flink.dli.job.agency.name=agency
委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
开启checkpoint
勾选开启checkpoint。
开启
自定义参数
-
示例中关于Catalog的参数说明请参考表4
表4 Flink OpenSource SQL示例中关于Catalog的参数说明 参数
说明
是否必填
参数值
type
catalog类型
是
固定值hive
hive-conf-dir
hive-conf路径,固定值/opt/flink/conf
是
固定值/opt/flink/conf
default-database
默认数据库名称
否
默认default库
1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031
CREATE CATALOG hiveWITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', -- 固定配置/opt/flink/conf 'default-database'='default' );USE CATALOG hive;CREATE TABLE IF NOT EXISTS dataGenSource612 (user_id string, amount int)WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' );CREATE table IF NOT EXISTS printSink612 (user_id string, amount int)WITH ('connector' = 'print');INSERT INTO printSink612SELECT *FROM dataGenSource612;
- 示例2:DEW的方式对接Lakeformation
创建Flink OpenSource SQL作业并配置如下参数:
参数
说明
配置示例
Flink版本
Flink 1.15及以上版本支持对接LakeFormation。
1.15
委托
使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:
flink.dli.job.agency.name=agency
委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
开启checkpoint
勾选开启checkpoint。
开启
自定义参数
-
示例中关于Catalog的参数说明请参考表5
需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。
表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式) 参数
说明
是否必填
参数值
type
catalog类型
是
固定值hive
hive-conf-dir
hive-conf路径,固定值/opt/flink/conf
是
固定值/opt/flink/conf
default-database
默认数据库名称
否
不填默认default库
properties.catalog.lakecat.auth.identity.util.class
认证信息获取类
是
dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator
properties.catalog.dew.projectId
DEW所在的项目ID, 默认是Flink作业所在的项目ID。
是
使用dew方式必填
properties.catalog.dew.endpoint
指定要使用的DEW服务所在的endpoint信息。
是
使用dew方式必填。
配置示例:kms.xxx.com
properties.catalog.dew.csms.secretName
在DEW服务的凭据管理中新建的通用凭据的名称。
是
使用dew方式必填
properties.catalog.dew.csms.version
在DEW服务的凭据管理中新建的通用凭据的版本号。
是
使用dew方式必填
properties.catalog.dew.access.key
在DEW服务的凭据中配置access.key值对应的key
是
使用dew方式必填
properties.catalog.dew.secret.key
在DEW服务的凭据中配置secret.key值对应的key
是
使用dew方式必填
1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536
CREATE CATALOG myhiveWITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='default', --下边是dew相关配置,请根据实际情况修改参数值 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator', 'properties.catalog.dew.endpoint'='kms.xxx.com', 'properties.catalog.dew.csms.secretName'='obsAksK', 'properties.catalog.dew.access.key' = 'myak', 'properties.catalog.dew.secret.key' = 'mysk', 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx', 'properties.catalog.dew.csms.version'='v9');USE CATALOG myhive;create table IF NOT EXISTS dataGenSource_dew612( user_id string, amount int) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3');create table IF NOT EXISTS printSink_dew612( user_id string, amount int) with ( 'connector' = 'print');insert into printSink_dew612 select * from dataGenSource_dew612;
- 示例3:委托的方式对接Lakeformation写hudi表
创建Flink OpenSource SQL作业并配置如下参数:
参数
说明
配置示例
Flink版本
Flink 1.15及以上版本支持对接LakeFormation。
1.15
委托
使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:
flink.dli.job.agency.name=agency
委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
开启checkpoint
勾选开启checkpoint。
开启
自定义参数
-
示例中关于Catalog的参数说明请参考表6。
表6 hudi类型Catalog参数说明 参数
说明
是否必填
参数值
type
catalog类型
是
hudi表配置为hudi。
hive-conf-dir
hive-conf路径,固定值/opt/flink/conf
是
固定值/opt/flink/conf。
default-database
默认数据库名称
否
默认default库。
mode
取值'hms' 或 'non-hms'。
- 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。
- 'non-hms'表示不使用Hive Metastore存储元数据信息。
是
固定值hms。
表7 hudi类型sink表的connector参数 参数
说明
是否必填
参数值
connector
flink connector类型。
配置为hudi表示sink表是hudi表。
是
hudi
path
表的基本路径。如果该路径不存在,则会创建它。
是
请参考示例代码中的配置值。
hoodie.datasource.write.recordkey.field
hoodie表的唯一键字段名
否
这里配置order_id为唯一键。
EXTERNAL
是否外表
是
hudi表必填,且设置为true
true
CREATE CATALOG hive_catalog WITH ( 'type'='hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='test' );USE CATALOG hive_catalog;create table if not exists genSource618 ( order_id STRING, order_name STRING, price INT, weight INT) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_id.kind' = 'random', 'fields.order_id.length' = '8', 'fields.order_name.kind' = 'random', 'fields.order_name.length' = '5');CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'hive.conf.dir' = '/opt/flink/conf', 'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence );CREATE TABLE if not exists hoodie_catalog.`test`.`hudiSink618` ( `order_id` STRING PRIMARY KEY NOT ENFORCED, `order_name` STRING, `price` INT, `weight` INT, `create_time` BIGINT, `create_date` String) PARTITIONED BY (create_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://xxx/catalog/dbtest3/hudiSink618', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'create_time', 'EXTERNAL' = 'true' -- must be set);insert into hoodie_catalog.`test`.`hudiSink618`select order_id, order_name, price, weight, UNIX_TIMESTAMP() as create_time, FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_datefrom genSource618;
- 示例1:委托的方式对接Lakeformation
- DLI Flink Jar
- 示例1:委托方式对接Lakeformation
- 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录
本例通过DataGen表产生随机数据并输出到Print结果表中。
其他connector类型可参考Flink 1.15支持的connector列表。
1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
package com.huawei.test;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.SimpleDateFormat;@SuppressWarnings({"deprecation", "rawtypes", "unchecked"})public class GenToPrintTaskAgency { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")";/*testdb是用户自定义的数数据库*/ tEnv.executeSql(dataSource); String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink);/*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJar618_1`"; tEnv.executeSql(query); }}
- 创建Flink jar作业并配置如下参数。
参数
说明
配置示例
Flink版本
Flink 1.15及以上版本支持对接LakeFormation。
1.15
委托
使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:
flink.dli.job.agency.name=agency
委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
优化参数
-
- 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录
- 示例2:DEW方式对接Lakeformation
- 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录
本例通过DataGen表产生随机数据并输出到Print结果表中。
其他connector类型可参考Flink 1.15支持的connector列表。
package com.huawei.test;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.SimpleDateFormat;@SuppressWarnings({"deprecation", "rawtypes", "unchecked"})public class GenToPrintTaskDew { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf',\n" + " 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" + " 'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" + " 'properties.catalog.dew.csms.secretName'='obsAksK',\n" + " 'properties.catalog.dew.access.key' = 'ak',\n" + " 'properties.catalog.dew.secret.key' = 'sk',\n" + " 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" + " 'properties.catalog.dew.csms.version'='v9'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; tEnv.executeSql(dataSource);/*testdb是用户自定义的数数据库*/ String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink);/*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`"; tEnv.executeSql(query); }}
- 创建Flink jar作业并配置如下参数。
参数
说明
配置示例
Flink版本
Flink 1.15及以上版本支持对接LakeFormation。
1.15
委托
使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:
flink.dli.job.agency.name=agency
委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
优化参数
-
- 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录
- 示例3:Flink jar支持Hudi表
- 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录
本例通过DataGen表产生随机数据并输出到Hudi结果表中。
其他connector类型可参考Flink 1.15支持的connector列表。
1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
package com.huawei.test;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.text.SimpleDateFormat;public class GenToHudiTask4 { private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) throws IOException { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "30000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String catalog = "CREATE CATALOG hoodie_catalog\n" + " WITH (\n" + " 'type'='hudi',\n" + " 'hive.conf.dir' = '/opt/hadoop/conf',\n" + " 'mode'='hms'\n" + " )"; tEnv.executeSql(catalog); String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" + " order_id STRING,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.kind' = 'random',\n" + " 'fields.order_id.length' = '8',\n" + " 'fields.order_name.kind' = 'random',\n" + " 'fields.order_name.length' = '8'\n" + ")"; tEnv.executeSql(dwsSource);/*testdb是用户自定义的数数据库*/ String printSinkdws = "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" + " order_id STRING PRIMARY KEY NOT ENFORCED,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT,\n" + " create_time BIGINT,\n" + " create_date String\n" + ") WITH (" + "'connector' = 'hudi',\n" + "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" + "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" + "'EXTERNAL' = 'true'\n" + ")"; tEnv.executeSql(printSinkdws);/*testdb是用户自定义的数数据库*/ String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" + " order_id,\n" + " order_name,\n" + " price,\n" + " weight,\n" + " UNIX_TIMESTAMP() as create_time,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" + " from genSourceJarForHudi618_1"; tEnv.executeSql(query); }}
表8 hudi类型sink表的connector参数 参数
说明
是否必填
参数值
connector
flink connector类型。
配置为hudi表示sink表是hudi表。
是
hudi
path
表的基本路径。如果该路径不存在,则会创建它。
是
请参考示例代码中的配置值。
hoodie.datasource.write.recordkey.field
hoodie表的唯一键字段名
否
这里配置order_id为唯一键。
EXTERNAL
是否外表
是
hudi表必填,且设置为true
true
- 创建Flink jar作业并配置如下参数。
参数
说明
配置示例
Flink版本
Flink 1.15及以上版本支持对接LakeFormation。
1.15
委托
使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:
flink.dli.job.agency.name=agency
委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
-
优化参数
-
- 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录
- 示例1:委托方式对接Lakeformation