华为云用户手册

  • 导入数据 DLI 提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def import_data(dli_client, db_name, tbl_name, queue_name): options = { "with_column_header": True, "delimiter": ",", "quote_char": "\"", "escape_char": "\\", "date_format": "yyyy/MM/dd", "timestamp_format": "yyyy/MM/dd hh:mm:ss" } try: job_id, status = \ dli_client.import_table(tbl_name, db_name, 'obs://bucket/obj/data.csv', 'csv', queue_name=queue_name, options=options) except DliException as e: print(e) return print(job_id) print(status) 在提交导入作业前,可选择通过data_type参数设置导入数据的类型,例如将data_type设置为csv。csv数据的具体格式通可过options参数设置,例如:csv的分隔符,转义符等。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • 导出数据 DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 def export_data(dli_client, db_name, tbl_name, queue_name): try: job_id, status = dli_client.export_table(tbl_name, db_name, 'obs://bucket/obj', queue_name=queue_name) except DliException as e: print(e) return print(job_id) print(status) 在提交导出作业前,可选设置数据格式、压缩类型、导出模式等,导出格式只支持csv格式。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • 查询所有作业 DLI提供查询所有作业的接口。您可以使用该接口执行查询当前工程下的所有作业的信息并获取查询结果。示例代码如下: 1 2 3 4 5 6 7 8 def list_all_sql_jobs(dli_client): try: sql_jobs = dli_client.list_sql_jobs() except DliException as e: print(e) return for sql_job in sql_jobs: print(sql_job) 该SDK接口不支持sql_pattern,即通过指定sql片段作为作业过滤条件进行查询。 如果需要则可以通过查询所有作业API接口指定该参数进行查询。
  • 新建作业模板 DLI提供新建Flink作业模板的接口。您可以使用该接口新建一个Flink作业模板。示例代码如下: 1 2 3 4 5 6 public static void createFlinkJobTemplate(DLIClient client) throws DLIException{ CreateFlinkJobTemplateRequest body = new CreateFlinkJobTemplateRequest(); body.name("template"); FlinkJobTemplateCreateResponse result = client.createFlinkJobTemplate(body); System.out.println(result); }
  • 更新作业模板 DLI提供更新Flink作业模板的接口。您可以使用该接口修改一个Flink作业模板。示例代码如下: 1 2 3 4 5 6 7 public static void updateFlinkJobTemplate(DLIClient client) throws DLIException{ Long templateId = 277L;//模板Id UpdateFlinkJobTemplateRequest body = new UpdateFlinkJobTemplateRequest(); body.name("template-update"); GlobalResponse result = client.updateFlinkJobTemplate(body,templateId); System.out.println(result); }
  • 删除作业模板 DLI提供删除Flink作业模板的接口。您可以使用该接口删除已经创建的作业模板,如果当前模板被引用也允许删除模板。示例代码如下: 1 2 3 4 5 public static void deleteFlinkJobTemplate(DLIClient client) throws DLIException{ Long templateId = 277L;//模板Id FlinkJobTemplateDeleteResponse result = client.deleteFlinkJobTemplate(templateId); System.out.println(result); }
  • 查询作业模板列表 DLI提供查询Flink作业模板的接口。您可以使用该接口查询作业模板列表。本示例排序方式选择降序desc,将会列出作业模板ID小于cursor的作业模板列表信息。示例代码如下: 1 2 3 4 5 6 7 public static void getFlinkJobTemplates(DLIClient client) throws DLIException{ Long offset = 789L; // Long | 模板偏移量。 Integer limit = 56; // Integer | 查询条数限制 String order = "asc"; // String | 查询结果排序, 升序和降序两种可选 FlinkJobTemplateListResponse result = client.getFlinkJobTemplates(offset,limit,order); System.out.println(result); }
  • SDK列表 表1提供了DLI云服务支持的SDK列表,您可以在GitHub仓库查看SDK更新历史、获取安装包以及查看指导文档。 表1 SDK列表 编程语言 Github地址 参考文档 视频指导 Java huaweicloud-sdk-java-v3 Java SDK使用指导 Java SDK视频指导 Python huaweicloud-sdk-python-v3 Python SDK使用指导 Python SDK视频指导 PHP huaweicloud-sdk-php-v3 PHP SDK使用指导 PHP SDK视频指导 Go huaweicloud-sdk-go-v3 Go SDK使用指导 Go SDK视频指导 Node.js huaweicloud-sdk-nodejs-v3 Node.js SDK使用指导 Node.js SDK视频指导 .NET huaweicloud-sdk-net-v3 .NET SDK使用指导 .NET SDK视频指导 SDK列表提供了DLI云服务支持的SDK列表,您可以在GitHub仓库查看SDK更新历史、获取安装包以及查看指导文档。
  • 上传资源包 您可以使用DLI提供的接口上传资源包,示例代码如下。完整样例代码和依赖包说明请参考:Python SDK概述。 1 2 3 4 5 6 def upload_resource(dli_client, kind, obs_jar_paths, group_name): try: dli_client.upload_resource(kind, obs_jar_paths, group_name) except DliException as e: print(e) return 请求参数说明如下,详细参数使用可以参考Python SDK概述下载样例代码。 kind:资源包类型,当前支持的包类型分别为: jar:用户jar文件 pyfile:用户Python文件 file:用户文件 modelfile:用户AI模型文件 obs_jar_paths:对应资源包的OBS路径,参数构成为:{bucketName}.{obs 域名 }/{jarPath}/{jarName}。 例如:"https://bucketname.obs.cn-north-1.myhuaweicloud.com/jarname.jar" group_name:资源包所属分组名称。
  • 查询所有资源包 DLI提供查询资源列表接口,您可以使用该接口并选择相应的资源来执行作业。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 def list_resources(dli_client): try: resources = dli_client.list_resources() except DliException as e: print(e) return for resources_info in resources.package_resources: print('Package resource name:' + resources_info.resource_name) for group_resource in resources.group_resources: print('Group resource name:' + group_resource.group_name) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 删除资源包 您可以使用该接口删除已上传的资源包,示例代码如下: def delete_resource(dli_client, resource_name, group_name): try: dli_client.delete_resource(resource_name, group_name) except DliException as e: print(e) return
  • 查询指定资源包 您可以使用该接口查询指定的资源包信息,示例代码如下: def get_package_resource(dli_client, resource_name, group_name): try: pkg_resource = dli_client.get_package_resource(resource_name, group_name) print(pkg_resource) except DliException as e: print(e) return
  • AK/SK认证方式样例代码 代码样例 1 2 3 4 5 6 String ak = System.getenv("xxx_SDK_AK");//访问密钥ID。 String sk = System.getenv("xxx_SDK_SK");//与访问密钥ID结合使用的密钥。 String regionName = "regionname"; String projectId = "project_id"; DLIInfo dliInfo = new DLIInfo(regionName, ak, sk, projectId); DLIClient client = new DLIClient(AuthenticationMode.AKSK, dliInfo); 参数说明及获取方式 参数说明 ak:账号 Access Key sk:账号 Secret Access Key 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全。 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量xxx_SDK_AK和xxx_SDK_SK。 regionName :所属区域名称 projectId :项目ID 通过以下方式可获取AK/SK,项目ID及对应的region信息。 登录管理控制台。 鼠标指向界面右上角的登录用户名,在下拉列表中单击“我的凭证”。 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
  • Token认证方式样例代码 代码样例 1 2 3 4 5 6 7 String domainName = "domainname"; String userName = "username"; String password = "password"; String regionName = "regionname"; String projectId = "project_id"; DLIInfo dliInfo = new DLIInfo(regionName, domainName, userName, password, projectId); DLIClient client = new DLIClient(AuthenticationMode.TOKEN, dliInfo); 参数说明 参数获取方式请参考获取帐号、 IAM 用户、项目、用户组、区域、委托的名称和ID。 domainname:帐号名。 username:用户名 password:用户名密码 regionname:所属区域名称 project_id:项目ID 认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 可以通过set方式修改endpoint,即dliInfo.setServerEndpoint(endpoint)。
  • 导出查询结果 DLI提供导出查询结果的接口。您可以使用该接口导出当前工程下,在编辑框中提交的查询作业的结果。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 //实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句 private static void exportSqlResult(Queue queue, Table obsTable) throws DLIException { String sql = "select * from " + obsTable.getTableName(); String queryResultPath = "OBS Path"; SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql); System.out.println("start submit SQL job..."); //调用SQLJob对象的submit接口提交查询作业 sqlJob.submit(); //调用SQLJob对象的getStatus接口查询作业状态 JobStatus status = sqlJob.getStatus(); System.out.println(status); System.out.println("start export Result..."); //调用SQLJob对象的exportResult接口导出查询结果,其中exportPath为导出数据的路径,JSON为导出格式,queueName为执行导出作业的队列,limitNum为导出作业结果条数,0表示全部导出 sqlJob.exportResult(exportPath + "result", StorageType.JSON, CompressType.NONE, ExportMode.ERRORIFEXISTS, queueName, true, 5); }
  • 导出数据 DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 //实例化ExportJob对象,传入导出数据所需的队列、数据库名、表名(通过实例化Table对象获取)和导出数据的存储路径,仅支持Table类型为MANAGED private static void exportData(Queue queue, Table DLITable) throws DLIException { String dataPath = "OBS Path"; queue = client.getQueue("queueName"); String dbName = DLITable.getDb().getDatabaseName(); String tableName = DLITable.getTableName(); ExportJob exportJob = new ExportJob(queue, dbName, tableName, dataPath); exportJob.setStorageType(StorageType. CS V); exportJob.setCompressType(CompressType.GZIP); exportJob.setExportMode(ExportMode.ERRORIFEXISTS); System.out.println("start export DLI Table data..."); //调用ExportJob对象的submit接口提交导出作业 exportJob.submit(); //调用ExportJob对象的getStatus接口查询导出作业状态 JobStatus status = exportJob.getStatus(); System.out.println("Job id: " + exportJob.getJobId() + ", Status : " + status.getName()); } 在提交导出作业前,可选设置数据格式,压缩类型,导出模式等,如样例所示,分别调用ExportJob对象的setStorageType、setCompressType、setExportMode接口设置,其中setStorageType仅支持csv格式。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • 提交作业 DLI提供提交作业和查询作业的接口。您可以通过提交接口提交作业,如果需要查询结果可以调用查询接口查询该作业的结果。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 //实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句 private static void runSqlJob(Queue queue, Table obsTable) throws DLIException { String sql = "select * from " + obsTable.getTableName(); String queryResultPath = "OBS Path"; SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql); System.out.println("start submit SQL job..."); //调用SQLJob对象的submit接口提交查询作业 sqlJob.submit(); //调用SQLJob对象的getStatus接口查询作业状态 JobStatus status = sqlJob.getStatus(); System.out.println(status); System.out.println("start export Result..."); //调用SQLJob对象的exportResult接口导出查询结果,其中queryResultPath为导出数据的路径 sqlJob.exportResult(queryResultPath, StorageType.CSV, CompressType.GZIP, ExportMode.ERRORIFEXISTS, null); System.out.println("Job id: " + sqlJob.getJobId() + ", Status : " + status.getName()); }
  • 查询作业结果 DLI提供查询作业结果的接口。您可以使用该接口通过JobId查询该作业信息。示例代码如下: 1 2 3 4 5 6 7 8 9 private static void getJobResultInfo(DLIClient client) throws DLIException { String jobId = "4c4f7168-5bc4-45bd-8c8a-43dfc85055d0"; JobResultInfo jobResultInfo = client.queryJobResultInfo(jobId); //查询job信息 System.out.println(jobResultInfo.getJobId()); System.out.println(jobResultInfo.getDetail()); System.out.println(jobResultInfo.getJobStatus()); System.out.println(jobResultInfo.getJobType()); }
  • 导入分区数据 DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表指定分区中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 //实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径 private static void importData(Queue queue, Table DLITable) throws DLIException { String dataPath = "OBS Path"; queue = client.getQueue("queueName"); CsvFormatInfo formatInfo = new CsvFormatInfo(); formatInfo.setWithColumnHeader(true); formatInfo.setDelimiter(","); formatInfo.setQuoteChar("\""); formatInfo.setEscapeChar("\\"); formatInfo.setDateFormat("yyyy/MM/dd"); formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss"); String dbName = DLITable.getDb().getDatabaseName(); String tableName = DLITable.getTableName(); PartitionSpec partitionSpec = new PartitionSpec("part1=value1,part2=value2"); Boolean isOverWrite = true; ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath, partitionSpec, isOverWrite); importJob.setStorageType(StorageType.CSV); importJob.setCsvFormatInfo(formatInfo); System.out.println("start submit import table: " + DLITable.getTableName()); //调用ImportJob对象的submit接口提交导入作业 importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态 JobStatus status = importJob.getStatus(); System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName()); } 在创建ImportJob对象的时候分区信息PartitionSpec也可以直接传入分区字符串。 partitionSpec如果导入时指定部分列为分区列,而导入的数据只包含了指定的分区信息,则数据导入后的未指定的分区列字段会存在null值等异常值。 示例中isOverWrite表示是否是覆盖写,为true表示覆盖写,为false表示追加写。目前不支持overwrite覆盖写整表,只支持overwrite写指定分区。如果需要追加写指定分区,则在创建ImportJob的时候指定isOverWrite为false。
  • 导入数据 DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 //实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径 private static void importData(Queue queue, Table DLITable) throws DLIException { String dataPath = "OBS Path"; queue = client.getQueue("queueName"); CsvFormatInfo formatInfo = new CsvFormatInfo(); formatInfo.setWithColumnHeader(true); formatInfo.setDelimiter(","); formatInfo.setQuoteChar("\""); formatInfo.setEscapeChar("\\"); formatInfo.setDateFormat("yyyy/MM/dd"); formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss"); String dbName = DLITable.getDb().getDatabaseName(); String tableName = DLITable.getTableName(); ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath); importJob.setStorageType(StorageType.CSV); importJob.setCsvFormatInfo(formatInfo); System.out.println("start submit import table: " + DLITable.getTableName()); //调用ImportJob对象的submit接口提交导入作业 importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态 JobStatus status = importJob.getStatus(); System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName()); } 在提交导入作业前,可选择设置导入数据的格式,如样例所示,调用ImportJob对象的setStorageType接口设置数据存储类型为csv,数据的具体格式通过调用ImportJob对象的setCsvFormatInfo接口进行设置。 在提交导入作业前,可选择设置导入数据的分区并配置是否是overwrite写入,分区信息可以调用ImportJob对象的setPartitionSpec接口设置,如:importJob.setPartitionSpec(new PartitionSpec("part1=value1,part2=value2")),也可以在创建ImportJob对象的时候直接通过参数的形式创建 。导入作业默认是追加写,如果需要覆盖写,则可以调用ImportJob对象的setOverWrite接口设置,如:importJob.setOverWrite(Boolean.TRUE)。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • AK/SK认证方式样例代码 代码样例 1 2 3 4 5 6 7 8 def init_aksk_dli_client(): auth_mode = 'aksk' region = 'xxx' project_id = 'xxxx' ak = System.getenv("xxx_SDK_AK")//访问密钥ID。 sk = System.getenv("xxx_SDK_SK")//与访问密钥ID结合使用的密钥。 dli_client = DliClient(auth_mode=auth_mode, region=region, project_id=project_id,ak=ak, sk=sk) return dli_client 参数说明与获取方式 参数说明 ak:账号 Access Key sk:账号 Secret Access Key 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全。 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量xxx_SDK_AK和xxx_SDK_SK。 regionName :所属区域名称 projectId :项目ID 通过以下方式可获取AK/SK,项目ID及对应的region信息。 登录管理控制台。 鼠标指向界面右上角的登录用户名,在下拉列表中单击“我的凭证”。 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
  • Token认证方式样例代码 代码样例 1 2 3 4 5 6 7 8 9 def init_token_dli_client(): auth_mode = 'token' region = 'xxx' project_id = 'xxxx' account = 'xxx account' user = 'xxxx' password = 'xxxx' dli_client = DliClient(auth_mode=auth_mode, region=region, project_id=project_id,account=account, user=user, password=password) return dli_client 参数说明 参数获取方式请参考获取帐号、IAM用户、项目、用户组、区域、委托的名称和ID。 domainname:帐号名。 username:用户名 password:用户名密码 regionname:所属区域名称 project_id:项目ID 认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 可以通过set方式修改endpoint,即dliInfo.setServerEndpoint(endpoint)。
  • 删除批处理作业 DLI提供删除批处理作业的接口。您可以使用该接口删除批处理作业。示例代码如下: 1 2 3 4 5 6 7 private static void deleteBatchJob(DLIClient client) throws DLIException { //提交Spark批处理运行作业的Id String batchId = "0aae0dc5-f009-4b9b-a8c3-28fbee399fa6"; // 调用BatchJob对象的delBatch接口取消批处理作业 MessageInfo messageInfo = client.delBatchJob(batchId); System.out.println(messageInfo.getMsg()); }
  • 查询批处理作业状态 DLI提供查询批处理作业状态的接口。您可以使用该接口查询批处理作业当前的状态信息。示例代码如下: private static void getStateBatchJob(DLIClient client) throws DLIException { BatchJob batchJob = null; SparkJobInfo jobInfo = new SparkJobInfo(); jobInfo.setClusterName("queueName"); jobInfo.setFile("xxx.jar"); jobInfo.setClassName("your.class.name"); batchJob = new BatchJob(client.getCluster("queueName"), jobInfo); batchJob.asyncSubmit(); SparkJobStatus sparkJobStatus=batchJob.getStatus(); System.out.println(sparkJobStatus); }
  • 提交批处理作业 DLI提供执行批处理作业的接口。您可以使用该接口执行批处理作业。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static void runBatchJob(Cluster cluster) throws DLIException { SparkJobInfo jobInfo = new SparkJobInfo(); jobInfo.setClassName("your.class.name"); jobInfo.setFile("xxx.jar"); jobInfo.setCluster_name("queueName"); // 调用BatchJob对象的asyncSubmit接口提交批处理作业 BatchJob job = new BatchJob(cluster, jobInfo); job.asyncSubmit(); while (true) { SparkJobStatus jobStatus = job.getStatus(); if (SparkJobStatus.SUC CES S.equals(jobStatus)) { System.out.println("Job finished"); return; } if (SparkJobStatus.DEAD.equals(jobStatus)) { throw new DLIException("The batch has already exited"); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } Cluster为用户自建的队列。 传参不能为JSON格式。 对应批处理作业提交提供两个接口: 异步 asyncSubmit,提交后直接返回,不等待 同步 submit,提交后会一直等待作业执行结束
  • 查询所有表 DLI提供查询表的接口。您可以使用该接口查询数据库下的所有表。示例代码如下: 1 2 3 4 5 6 7 8 9 def list_all_tbls(dli_client, db_name): try: tbls = dli_client.list_tables(db_name, with_detail=True) except DliException as e: print(e) return for tbl in tbls: print(tbl.name) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 描述表信息 您可以使用该接口获取表的元数据描述信息。示例代码如下: def get_table_schema(dli_client, db_name, tbl_name): try: table_info = dli_client.get_table_schema(db_name, tbl_name) print(table_info) except DliException as e: print(e) return
  • 创建DLI表 DLI提供创建DLI表的接口。您可以使用该接口创建数据存储在DLI内部的表。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def create_dli_tbl(dli_client, db_name, tbl_name): cols = [ Column('col_1', 'string'), Column('col_2', 'string'), Column('col_3', 'smallint'), Column('col_4', 'int'), Column('col_5', 'bigint'), Column('col_6', 'double'), Column('col_7', 'decimal(10,0)'), Column('col_8', 'boolean'), Column('col_9', 'date'), Column('col_10', 'timestamp') ] sort_cols = ['col_1'] tbl_schema = TableSchema(tbl_name, cols, sort_cols) try: table = dli_client.create_dli_table(db_name, tbl_schema) except DliException as e: print(e) return print(table) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 创建OBS表 DLI提供创建OBS表的接口。您可以使用该接口创建数据存储在OBS的表。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def create_obs_tbl(dli_client, db_name, tbl_name): cols = [ Column('col_1', 'string'), Column('col_2', 'string'), Column('col_3', 'smallint'), Column('col_4', 'int'), Column('col_5', 'bigint'), Column('col_6', 'double'), Column('col_7', 'decimal(10,0)'), Column('col_8', 'boolean'), Column('col_9', 'date'), Column('col_10', 'timestamp') ] tbl_schema = TableSchema(tbl_name, cols) try: table = dli_client.create_obs_table(db_name, tbl_schema, 'obs://bucket/obj', 'csv') except DliException as e: print(e) return print(table) 创建OBS表需要指定OBS路径,且该路径需要提前创建。 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 删除表 DLI提供删除表的接口。您可以使用该接口删除数据库下的所有表。示例代码如下: 1 2 3 4 5 6 7 8 def delete_tbls(dli_client, db_name): try: tbls = dli_client.list_tables(db_name) for tbl in tbls: dli_client.delete_table(db_name, tbl.name) except DliException as e: print(e) return 表删除后,将不可恢复,请谨慎操作。 完整样例代码和依赖包说明请参考:Python SDK概述。
共100000条