华为云用户手册

  • DataSource和Hive两种语法创建OBS表的区别 DataSource语法和Hive语法主要区别在于支持的表数据存储格式范围、支持的分区数等有差异。两种语法创建OBS表主要差异点参见表1。 表1 DataSource语法和Hive语法创建OBS表的差异点 语法 支持的数据类型范围 创建分区表时分区字段差异 支持的分区数 DataSource语法 支持ORC,PARQUET,JSON, CS V,AVRO类型 创建分区表时,分区字段在表名和PARTITIONED BY后都需要指定。具体可以参考DataSource语法创建单分区OBS表。 单表分区数最多允许7000个。 Hive语法 支持TEXTFILE, AVRO, ORC, SEQUENCEFILE, RCFILE, PARQUET 创建分区表时,指定的分区字段不能出现在表后,只能通过PARTITIONED BY指定分区字段名和类型。具体可以参考Hive语法创建OBS分区表。 单表分区数最多允许100000个。 创建OBS表的DataSource语法可以参考使用DataSource语法创建OBS表。 创建OBS表的Hive语法可以参考使用Hive语法创建OBS表。
  • 使用DataSource语法创建OBS表 以下通过创建CSV格式的OBS表举例,创建其他数据格式的OBS表方法类似,此处不一一列举。 创建OBS非分区表 指定OBS数据文件,创建csv格式的OBS表。 按照以下文件内容创建“test.csv”文件,并将“test.csv”文件上传到OBS桶“dli-test-021”的根目录下。 Jordon,88,23 Kim,87,25 Henry,76,26 登录 DLI 管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”,执行以下命令创建OBS表。 CREATE TABLE testcsvdatasource (name STRING, score DOUBLE, classNo INT ) USING csv OPTIONS (path "obs://dli-test-021/test.csv"); 如果是通过指定的数据文件创建的OBS表,后续不支持在DLI通过insert表操作插入数据。OBS文件内容和表数据保持同步。 查询已创建的“testcsvdatasource”表数据。 select * from testcsvdatasource; 图1 查询结果 本地修改原始的OBS表文件“test.csv”,增加一行“Aarn,98,20”数据,重新替换OBS桶目录下的“test.csv”文件。 Jordon,88,23 Kim,87,25 Henry,76,26 Aarn,98,20 在DLI的SQL编辑器中再次查询“testcsvdatasource”表数据,DLI上可以查询到新增的“Aarn,98,20”数据。 select * from testcsvdatasource; 图2 查询结果 指定OBS数据文件目录,创建csv格式的OBS表。 指定的OBS数据目录不包含数据文件。 在OBS桶“dli-test-021”根目录下创建数据文件目录“data”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata2source”。 CREATE TABLE testcsvdata2source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data"); 通过insert语句插入表数据。 insert into testcsvdata2source VALUES('Aarn','98','20'); insert作业运行成功后,查询OBS表“testcsvdata2source”数据。 select * from testcsvdata2source; 图3 查询结果 在OBS桶的“obs://dli-test-021/data”目录下刷新后查询,生成了csv数据文件,文件内容为insert插入的数据内容。 图4 查询结果 指定的OBS数据目录包含数据文件。 在OBS桶“dli-test-021”根目录下创建数据文件目录“data2”。创建如下内容的测试数据文件“test.csv”,并上传文件到“obs://dli-test-021/data2”目录下。 Jordon,88,23 Kim,87,25 Henry,76,26 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata3source”。 CREATE TABLE testcsvdata3source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data2"); 通过insert语句插入表数据。 insert into testcsvdata3source VALUES('Aarn','98','20'); insert作业运行成功后,查询OBS表“testcsvdata3source”数据。 select * from testcsvdata3source; 图5 查询结果 在OBS桶的“obs://dli-test-021/data2”目录下刷新后查询,生成了一个csv数据文件,内容为insert插入的表数据内容。 图6 查询结果 创建OBS分区表 创建单分区OBS表 在OBS桶“dli-test-021”根目录下创建数据文件目录“data3”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建以“classNo”列为分区的OBS分区表“testcsvdata4source”,指定OBS目录“obs://dli-test-021/data3”。 CREATE TABLE testcsvdata4source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data3") PARTITIONED BY (classNo); 在OBS桶的“obs://dli-test-021/data3”目录下创建“classNo=25”的分区目录。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data3/classNo=25”目录下。 Jordon,88,25 Kim,87,25 Henry,76,25 在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata4source ”。 ALTER TABLE testcsvdata4source ADD PARTITION (classNo = 25) LOCATION 'obs://dli-test-021/data3/classNo=25'; 查询OBS表“testcsvdata4source ”classNo分区为“25”的数据: select * from testcsvdata4source where classNo = 25; 图7 查询结果 插入如下数据到OBS表“testcsvdata4source ”: insert into testcsvdata4source VALUES('Aarn','98','25'); insert into testcsvdata4source VALUES('Adam','68','24'); 查询OBS表“testcsvdata4source ”classNo分区为“25”和“24”的数据。 分区表在进行查询时where条件中必须携带分区字段,否则会查询失败,报:DLI.0005: There should be at least one partition pruning predicate on partitioned table。 select * from testcsvdata4source where classNo = 25; 图8 查询结果 select * from testcsvdata4source where classNo = 24; 图9 查询结果 在OBS桶的“obs://dli-test-021/data3”目录下点击刷新,该目录下生成了对应的分区文件,分别存放新插入的表数据。 图10 OBS上classNo分区为“25”文件数据 图11 OBS上classNo分区为“24”文件数据 创建多分区OBS表 在OBS桶“dli-test-021”根目录下创建数据文件目录“data4”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在“testdb”数据库下创建以“classNo”和“dt”列为分区的OBS分区表“testcsvdata5source”,指定OBS目录“obs://dli-test-021/data4”。 CREATE TABLE testcsvdata5source (name STRING, score DOUBLE, classNo INT, dt varchar(16)) USING csv OPTIONS (path "obs://dli-test-021/data4") PARTITIONED BY (classNo,dt); 给 testcsvdata5source表插入如下测试数据: insert into testcsvdata5source VALUES('Aarn','98','25','2021-07-27'); insert into testcsvdata5source VALUES('Adam','68','25','2021-07-28'); 根据classNo分区列查询testcsvdata5source数据。 select * from testcsvdata5source where classNo = 25; 图12 查询结果 根据dt分区列查询testcsvdata5source数据。 select * from testcsvdata5source where dt like '2021-07%'; 图13 查询结果 在OBS桶“obs://dli-test-021/data4”目录下刷新后查询,会生成如下数据文件: 文件目录1:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-27 图14 查询结果 文件目录2:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-28 图15 查询结果 在OBS桶的“obs://dli-test-021/data4”目录下创建“classNo=24”的分区目录,再在“classNo=24”目录下创建子分区目录“dt=2021-07-29”。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data4/classNo=24/dt=2021-07-29”目录下。 Jordon,88,24,2021-07-29 Kim,87,24,2021-07-29 Henry,76,24,2021-07-29 在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata5source ”。 ALTER TABLE testcsvdata5source ADD PARTITION (classNo = 24,dt='2021-07-29') LOCATION 'obs://dli-test-021/data4/classNo=24/dt=2021-07-29'; 根据classNo分区列查询testcsvdata5source数据。 select * from testcsvdata5source where classNo = 24; 图16 查询结果 根据dt分区列查询所有“2021-07”月的所有数据。 select * from testcsvdata5source where dt like '2021-07%'; 图17 查询结果
  • 约束限制 仅支持Spark3.3.1版本(Spark通用队列场景)使用委托授权访问临时凭证: 在创建作业时,请配置作业使用Spark3.3.1版本 已在作业中配置允许DLI访问DEW的委托信息。spark.dli.job.agency.name=自定义委托名称。 自定义委托请参考自定义DLI委托权限。 请注意配置参数不需要用"" 或 '' 包裹。 Spark3.3.1基础镜像内置了3.1.62版本的huaweicloud-sdk-core。
  • 功能描述 DLI提供了一个通用接口,可用于获取用户在启动Spark作业时设置的委托的临时凭证。该接口将获取到的该作业委托的临时凭证封装到com.huaweicloud.sdk.core.auth.BasicCredentials类中。 获取到的委托的临时认证封装到com.huaweicloud.sdk.core.auth.ICredentialProvider接口的getCredentials()返回值中。 返回类型为com.huaweicloud.sdk.core.auth.BasicCredentials。 仅支持获取AK、SK、SecurityToken。 获取到AK、SK、SecurityToken后,请参考如何使用凭据管理服务替换硬编码的数据库账号密码查询凭据。
  • 步骤3:创建DWS数据库和表 参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。 执行以下命令连接DWS集群的默认数据库“gaussdb”: gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r gaussdb:DWS集群默认数据库。 DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问 域名 ”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。 dbadmin:创建集群时设置的默认管理员用户名。 password :默认管理员用户的密码。 在命令行窗口输入以下命令创建数据库“testdwsdb”。 CREATE DATABASE testdwsdb; 执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。 \q gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r 执行以下命令创建表。 create schema test; set current_schema= test; drop table if exists qualified_cars; CREATE TABLE qualified_cars ( car_id VARCHAR, car_owner VARCHAR, car_age INTEGER , average_speed FLOAT8, total_miles FLOAT8 );
  • 整体作业开发流程 整体作业开发流程参考图1。 图1 作业开发流程 步骤1:创建队列:创建DLI作业运行的队列。 步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。 步骤3:创建DWS数据库和表:创建DWS数据库和表信息。 步骤4:创建增强型跨源连接:DLI上创建连接Kafka和DWS的跨源连接,打通网络。 步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。 步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。
  • 步骤6:发送数据和查询结果 使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。 Kafka生产和发送数据的方法请参考DMS - 连接实例生产消费信息。 发送样例数据如下: {"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"} {"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"} {"car_id":"3029", "car_owner":"Ann", "car_age":"10", "average_speed":"81", "total_miles":"230000"} 连接已创建的DWS集群。 具体操作请参考使用gsql命令行客户端连接DWS集群。 执行以下命令连接DWS集群的默认数据库“testdwsdb”: gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r 查询DWS的表数据。 select * from test.qualified_cars; 查询结果参考如下: car_id car_owner car_age average_speed total_miles 3027 lilei 7 76.0 15000.0
  • 步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图9 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarObs\target”下名为“SparkJarObs-1.0-SNAPSHOT.jar”。 图10 导出jar包
  • 步骤2:上传数据到OBS桶 根据如下数据,创建people.json文件。 {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”。 单击“上传对象”,将people.json文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“result”的文件夹。 单击“result”的文件夹,在“result”下单击“新建文件夹”,创建名为“parquet”的文件夹。
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志。 图14 diver日志 图15 “Driver日志”中的作业执行日志 如果作业运行成功,本示例进入OBS桶下的“result/parquet”目录,查看已生成预期的parquet文件。 图16 obs桶文件 如果作业运行失败,单击“操作”列“更多”下的“Driver日志”,显示具体的报错日志信息,根据报错信息定位问题原因。 例如,如下截图信息因为创建Spark Jar作业时主类名没有包含包路径,报找不到类名“SparkDemoObs”。 图17 报错信息 可以在“操作”列,单击“编辑”,修改“主类”参数为正确的:com.huawei.dli.demo.SparkDemoObs,单击“执行”重新运行该作业即可。
  • 开发流程 DLI进行Spark Jar作业开发流程参考如下: 图1 Spark Jar作业开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 上传数据到OBS桶 OBS控制台 将测试数据上传到OBS桶下。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码读取OBS数据。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 DLI控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 环境准备 在进行Spark Jar作业开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
  • 常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。 Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决? ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}] A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。
  • 环境准备 已在DLI控制台购买了通用队列。 已购买了DIS通道。开通DIS通道。 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业。 若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。
  • 语法格式 在Flink jar作业编辑界面,选择配置优化参数,配置信息如下: 不同的OBS桶,使用不同的AKSK认证信息。 可以使用如下配置方式,根据桶指定不同的AKSK信息,参数说明详见表1。 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_C SMS _KEY flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_CSMS_KEY flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.csms.secretName=CredentialName flink.hadoop.fs.dew.endpoint=ENDPOINT flink.hadoop.fs.dew.csms.version=VERSION_ID flink.hadoop.fs.dew.csms.cache.time.second=CACHE_TIME flink.dli.job.agency.name=USER_AGENCY_NAME
  • 前提条件 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考:创建通用凭据。 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限: DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。 DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。 DEW解密凭据的权限,kms:dek:decrypt。 委托权限示例请参考自定义DLI委托权限和常见场景的委托权限策略。 仅支持Flink1.15版本使用DEW管理访问凭据,在创建作业时,请配置作业使用Flink1.15版本、且已在作业中配置允许DLI访问DEW的委托信息。自定义委托及配置请参考自定义DLI委托权限。 使用该功能,所有涉及OBS的桶,都需要进行配置AKSK。
  • 操作场景 DLI将Flink Jar作业的输出数据写入到OBS时,需要配置AKSK访问OBS,为了确保AKSK数据安全,您可以用过 数据加密 服务(Data Encryption Workshop,DEW)、云凭据管理服务(Cloud Secret Management Service,CSMS),对AKSK统一管理,有效避免程序硬编码或明文配置等问题导致的敏感信息泄露以及权限失控带来的业务风险。 本例以获取访问OBS的AKSK为例介绍Flink Jar使用DEW获取访问凭证读写OBS的操作指导。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 参数说明 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key 是 无 String USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。 参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的AK(Access Key Id),需要具备访问OBS对应桶的权限。 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key 是 无 String USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。 参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的SK(Secret Access Key),需要具备访问OBS对应桶的权限。 flink.hadoop.fs.obs.security.provider 是 无 String OBS AKSK认证机制,使用DEW服务中的CSMS凭证管理,获取OBS的AK、SK。 默认取值为com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.endpoint 是 无 String 指定要使用的DEW服务所在的endpoint信息。 获取地区和终端节点。 配置示例:flink.hadoop.fs.dew.endpoint=kms.cn-xxxx.myhuaweicloud.com flink.hadoop.fs.dew.projectId 否 有 String DEW所在的项目ID, 默认是Flink作业所在的项目ID。 获取项目ID flink.hadoop.fs.dew.csms.secretName 是 无 String 在DEW服务的凭据管理中新建的通用凭据的名称。 配置示例:flink.hadoop.fs.dew.csms.secretName=secretInfo flink.hadoop.fs.dew.csms.version 否 最新的version String 在DEW服务的凭据管理中新建的通用凭据的版本号(凭据的版本标识符)。 若不指定,则默认获取该通用凭证的最新版本号。 配置示例:flink.hadoop.fs.dew.csms.version=v1 flink.hadoop.fs.dew.csms.cache.time.second 否 3600 Long Flink作业访问获取CSMS通用凭证后,缓存的时间。 单位为秒。默认值为3600秒。 flink.dli.job.agency.name 是 - String 自定义委托名称。
  • 开发流程 DLI进行Spark作业访问DLI元数据开发流程参考如下: 图1 Spark作业访问DLI元数据开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 OBS桶文件配置 OBS控制台 如果是创建OBS表,则需要上传文件数据到OBS桶下。 配置Spark创建表的元数据信息的存储路径。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码创建DLI表或OBS表。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 DLI控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 步骤4:编写代码 编写DliCatalogTest程序创建数据库、DLI表和OBS表。 完整的样例请参考Java样例代码,样例代码分段说明如下: 导入依赖的包。 import org.apache.spark.sql.SparkSession; 创建SparkSession会话。 创建SparkSession会话时需要指定Spark参数:"spark.sql.session.state.builder"、"spark.sql.catalog.class"和"spark.sql.extensions",按照样例配置即可。 Spark2.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark2.4.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .config("spark.sql.hive.implementation","org.apache.spark.sql.hive.client.DliHiveClientImpl") .appName("java_spark_demo") .getOrCreate(); Spark3.1.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark3.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.DliLakeHouseBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.DliLakeHouseCatalog") .appName("java_spark_demo") .getOrCreate(); 创建数据库。 如下样例代码演示,创建名为test_sparkapp的数据库。 spark.sql("create database if not exists test_sparkapp").collect(); 创建DLI表并插入测试数据。 spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); 创建OBS表。如下示例中的OBS路径需要根据步骤2:OBS桶文件配置中的实际数据路径修改。 spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); 关闭SparkSession会话spark。 spark.stop();
  • 环境准备 在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
  • 步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图9 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarMetadata\target”下名为“SparkJarMetadata-1.0-SNAPSHOT.jar”。 图10 导出jar包
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。 可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。 图14 查看创建的数据库 双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。 图15 查看表 双击DLI表名dli_testtable,单击“执行”查询DLI表数据。 图16 查询DLI表数据 注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。 图17 查询OBS表数据 如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。 图18 查看Driver日志 原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。
  • Java样例代码 本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下: package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }
  • scala样例代码 object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class","org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }
  • Python样例代码 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
  • 步骤2:OBS桶文件配置 如果需要创建OBS表,则需要先上传数据到OBS桶目录下。 本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。 12,Michael 27,Andy 30,Justin 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”。 单击“上传对象”,将testdata.csv文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。
  • 约束限制 如果使用Spark 3.1访问元数据,则必须新建队列。 不支持的场景: 在SQL作业中创建了数据库(database),编写程序代码指定在该数据库下创建表。 例如在DLI的SQL编辑器中的某SQL队列下,创建了数据库testdb。后续通过编写程序代码在testdb下创建表testTable,编译打包后提交的Spark Jar作业则会运行失败。 支持的场景 在SQL作业中创建数据库(database),表(table) , 通过SQL或Spark程序作业读取插入数据。 在Spark程序作业中创建数据库(database),表(table), 通过SQL或Spark程序作业读取插入数据。
  • TPC-H样例数据简介 TPC-H(商业智能计算测试) 是交易处理效能委员会(TPC,Transaction Processing Performance Council) 组织制定的用来模拟决策支持类应用的一个测试集。目前,在学术界和工业界普遍用来评价决策支持技术方面应用的性能。这种商业测试可以全方位评测系统的整体商业计算综合能力,对厂商的要求更高,同时也具有普遍的商业实用意义,目前在银行信贷分析和信用卡分析、电信运营分析、税收分析、烟草行业决策分析中都有广泛的应用。 TPC-H 基准测试是由 TPC-D(由 TPC 组织于 1994 年制定的标准,用于决策支持系统方面的测试基准)发展而来的。TPC-H用3NF实现了一个 数据仓库 ,共包含8个基本关系,其数据量可以设定从1G~3T不等。TPC-H 基准测试包括 22 个查询(Q1~Q22),其主要评价指标是各个查询的响应时间,即从提交查询到结果返回所需时间。TPC-H 基准测试的度量单位是每小时执行的查询数( QphH@size),其中“H”表示每小时系统执行复杂查询的平均次数,“size”表示数据库规模的大小,能够反映出系统在处理查询时的能力。TPC-H 是根据真实的生产运行环境来建模的,这使得它可以评估一些其他测试所不能评估的关键性能参数。总而言之,TPC组织颁布的TPC-H 标准满足了数据仓库领域的测试需求,并且促使各个厂商以及研究机构将该项技术推向极限。 本示例将演示DLI直接对存储在OBS中的TPC-H数据集进行查询的操作,DLI已经预先生成了100M的TPC-H-2.18的标准数据集,已将数据集上传到了OBS的tpch文件夹中,并且赋予了只读访问权限,方便用户进行查询操作。
  • TPC-H的测试和度量指标 TPC-H 测试分解为3 个子测试:数据装载测试、Power测试和Throughput测试。建立测试数据库的过程被称为装载数据,装载测试是为测试DBMS装载数据的能力。装载测试是第一项测试,测试装载数据的时间,这项操作非常耗时。Power 测试是在数据装载测试完成后,数据库处于初始状态,未进行其它任何操作,特别是缓冲区还没有被测试数据库的数据,被称为raw查询。Power测试要求22 个查询顺序执行1 遍,同时执行一对RF1 和RF2 操作。最后进行Throughput 测试,也是最核心和最复杂的测试,更接近于实际应用环境,与Power 测试比对SUT 系统的压力有非常大的增加,有多个查询语句组,同时有一对RF1 和RF2 更新流。
共100000条