华为云用户手册

  • 打印作业结果 使用说明 请根据业务实际情况处理相应的每一行数据,此处仅作样例。 示例代码 1 2 3 4 5 6 7 8 9 def print_result(obs_reader): """ 请根据业务实际情况处理相应的每一行数据,此处仅作样例。 """ count = 0 for record in obs_reader: count += 1 print(record) logger.info("total records: %d", count)
  • to_json_string 使用说明 请根据业务实际情况构造相应的每一行数据,此处仅作样例。 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 def to_json_string(row, schema): json_obj = {} for i, column in enumerate(schema): if column.is_partition_column: continue if column.type == 'binary': json_obj[column.name] = base64.b64encode(row.columns[i]).decode('utf-8') elif column.type.startswith('decimal'): json_obj[column.name] = float(row.columns[i]) else: json_obj[column.name] = row.columns[i] return json.dumps(json_obj) + "\n"
  • 查询指定作业的结果 使用说明 关键API: 查询作业状态API 关键SDK:dli.job.SqlJob.get_result. 使用本方法的前提是需要开启结果写作业桶特性,否则作业运行时会抛出异常。 可通过查询作业状态API响应体中的 result_path 来判断是否已开启作业结果写作业桶特性。待作业运行结束后,如果 result_path 以 obs:// 开头,则已开启作业结果写作业桶特性,否则未开启。 示例代码 1 2 3 4 5 6 7 8 9 10 def query_data_by_jobid(dli_client, job_id): """ :param dli.dli_client.DliClient dli_client: DLI Client. :param str job_id: Query类型作业的job id :return: """ from dli.job import SqlJob sql_job = SqlJob(job_id=job_id, job_type="QUERY", client=dli_client) dli_client._cycle_check_sql_job(sql_job=sql_job) print_result(sql_job.get_result())
  • 写入数据到OBS writeTmpData 使用说明 此方法纯粹为直接调用OBS写数据相关API,与DLI API完全解耦,本示例提供了一个写Json的实现,即文件在OBS上的保存格式为Json。 用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def write_tmp_data(schema, upload_data_path, dli_info, total_records): """ 此方法纯粹为直接调用OBS写数据相关API,与DLI API完全解耦,本示例提供了一个写Json的实现,即文件在OBS上的保存格式为Json。 用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。 :param list schema: 数据的结构 schema :param str upload_data_path: 该OBS临时目录用于保存用户的业务数据 :param dli.dli_client.DliClient.dli_info dli_info: 初始化DliClient时传入的认证信息。 :param int total_records: 模拟数据行数。通过该参数配置插入的数据行数。此处仅做展示,用户可根据实际业务需求修改。 :return: """ obs_client = ObsClient(access_key_id=dli_info.ak, secret_access_key=dli_info.sk, is_secure=True, server=dli_info.obs) bucket_name = get_bucket_name(upload_data_path) object_prefix = get_object_prefix(upload_data_path) datas = "" try: for i in range(total_records): row = Row(schema=schema, columns=get_record()) datas += to_json_string(row, schema) object_key = object_prefix + "/tmp_data.json" obs_client.putObject(bucket_name, object_key, datas) logger.info("Uploaded data to OBS bucket '%s' with object key '%s'", bucket_name, object_key) finally: obs_client.close()
  • 查询作业结果 相关链接: SELECT查询语句 关键SDK:dli.dli_client.DliClient.execute_sql 关键SDK:dli.job.SqlJob.get_result. 需要开启结果写作业桶特性,否则会抛出异常。 可通过查询作业状态API响应体中的 result_path 来判断是否已开启作业结果写作业桶特性。 待作业运行结束后,如果result_path 以 obs:// 开头,则已开启作业结果写作业桶特性,否则未开启。 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job。详见 CreateSqlJob。 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status。详见 ShowSqlJobStatus 1 2 3 4 5 6 7 8 9 10 def query_data(dli_client, select_sql, queue_name): """ :param dli.dli_client.DliClient dli_client: DLI Client. :param str select_sql: SQL statement :param str queue_name: Queue name for the SQL execute :return: str """ sql_job = dli_client.execute_sql(sql=select_sql, queue_name=queue_name) print_result(sql_job.get_result()) return sql_job.job_id
  • 操作前准备 获取AK/SK,项目ID及对应的Region信息。 管理控制台。 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
  • 导入数据 相关链接: 导入数据 原生数据类型 复杂数据类型 关键SDK: dli.dli_client.DliClient.execute_sql 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job。详见 CreateSqlJob。 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status。详见 ShowSqlJobStatus 示例代码: 1 2 3 4 5 6 7 8 9 10 def load_data(dli_client, upload_data_path, queue_name): # 1. 写入数据到OBS临时目录。请根据业务实际情况做调整,此处仅作样例。 # 注:此步骤纯粹为直接调用OBS写数据相关API,与DLI完全解耦,本示例仅提供了一个写Json的实现,即文件在OBS上的保存格式为Json。 # 用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。 write_tmp_data(get_schema(), upload_data_path, dli_client.dli_info, 1) # 2. 将第一步中写到OBS上的数据导入到DLI。 # 注:此处的data_type需要根据第一步中的文件类型来确定,本示例中是JSON。如用户使用其它格式,则需要修改成相应的 data_type loadSql = f"LOAD DATA INPATH '{upload_data_path}' INTO TABLE demo_db.demo_tbl OPTIONS(data_type 'json')" dli_client.execute_sql(sql=loadSql, queue_name=queue_name) # 提交SQL作业,并循环检查作业状态直到作业结束
  • 样例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 def main(): your_access_key = os.getenv("HUAWEICLOUD_SDK_AK") your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK") kwargs = { 'region': 'region_name', 'project_id': 'your_project_id', 'ak': your_access_key, 'sk': your_secret_key } from dli.dli_client import DliClient dli_client = DliClient(**kwargs) try: # 步骤一:创建数据库、表。 queue_name = 'your_sql_queue_name' prepare(dli_client, queue_name) # 步骤二:导入数据到表中。 # 整体实现过程/原理可分为以下3步: # 1. 用OBS的API把数据上传到 “obs_path_to_write_tmp_data”。可在OBS中配置生命周期策略,定期删除这些临时数据。 # 2. 向DLI提交执行Load Data语句,从而把OBS的数据导入到DLI。 # Load Data语法详见导入数据。 # 3. 循环检查作业运行状态,直到作业结束。 obs_path_to_write_tmp_data = f"obs://your_obs_bucket_name/your/path/{uuid.uuid4()}" load_data(dli_client, obs_path_to_write_tmp_data, queue_name) # 步骤三:提交SQL语句,执行查询并读取结果。 select_sql = "SELECT * FROM demo_db.demo_tbl" job_id = query_data(dli_client, select_sql, queue_name) # 步骤三': 如有需要,用户也可以通过作业ID来获取结果。 query_data_by_jobid(dli_client, job_id) # 分页查询所有作业,用户可以使用该接口查询当前工程下的所有SQL作业信息 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs, # 详见 ListSqlJobs list_sql_jobs(dli_client) # 其它场景: # 1. 如果用户想取消已经提交的SQL作业,可使用以下接口。 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.cancel_sql_job # 详见 CancelSqlJob # 注:若作业已经执行结束或失败则无法取消。 # 2. 如果用户想对SQL语句做语法校验,可使用以下接口。 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.check_sql # 详见 CheckSql # 注:本接口只能做语法校验,无法做语义校验。请使用Explain语句,提交到DLI执行,进行语义校验。 # 3. 如果用户想根据jobId获取某个已提交的SQL作业,并查看作业详情,可使用以下接口。 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_detail # 详见 ShowSqlJobDetail # 4. 获取作业执行进度信息,如果作业正在执行,可以获取到子作业的信息,如果作业刚开始或者已经结束,则无法获取到子作业信息 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_progress # 详见 ShowSqlJobProgress except DliException as e: # 请根据业务实际情况处理异常信息,此处仅作样例。 logger.error(e)
  • 查询批处理作业列表 功能介绍: 查询批处理作业列表。 相关链接: 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#listSparkJobs(ListSparkJobsRequest) 查询批处理作业列表 示例代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void listSparkJob(DliClient client, String jobId) throws DLIException { ListSparkJobsResponse resp; try { resp = client.listSparkJobs(new ListSparkJobsRequest() // 您还可用.withXxx()方法指定其它条件来返回满足条件的SparkJobs,此处仅做样例。 // 详见 https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 表2 .withJobId(jobId) .withQueueName("YourQueueName") .withStart(1234567L) // 可以指定作业开始时间 .withEnd(2345678L)); // 可以指定作业结束时间 } catch (Exception e) { throw new DLIException("Failed to list Spark jobs: ", e); } logger.info(String.format("List SparkJobs : %s", resp.toString())); // resp中的响应参数详见:https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 表3和表4 }
  • 查询批处理作业状态 功能介绍: 用于执行Spark作业。 相关链接: 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSparkJobStatus(ShowSparkJobStatusRequest) 查询批处理作业状态 示例代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static void checkRunning(DliClient client, String jobId) throws DLIException { while (true) { ShowSparkJobStatusResponse resp; try { resp = client.showSparkJobStatus(new ShowSparkJobStatusRequest().withBatchId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get job status by id: " + jobId, e); } String status = resp.getState(); logger.info(String.format("SparkJob id %s status: %s", jobId, status)); if ("success".equals(status)) { return; } if ("dead".equals(status)) { throw new DLIException("Run job failed"); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }
  • 样例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private static final Logger logger = LoggerFactory.getLogger(SparkJobExample.class); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DliClient dliClient = DliClient.newBuilder() .withRegion(DliRegion.valueOf("RegionName")) .withCredential(new BasicCredentials() .withAk(yourAccessKey) .withSk(yourSecretKey) .withProjectId("YouProjectId")) .build(); try { // 步骤一:提交Spark作业到DLI执行。 String jobId = runSparkJob(dliClient, "YourQueueName"); // 步骤二:如果您想在当前线程等待作业执行结束,可循环检查状态,直到作业结束。 checkRunning(dliClient, jobId); // 步骤三:如果您想根据条件查询一个或多个特定作业,可执行以下方法。 // 此处仅作样例,除了jobId,您还可指定其它筛选条件。详见 https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 表2 listSparkJob(dliClient, jobId); /* * 其它场景: * 1. 作业运行期间,如果您想取消作业,可调用接口取消批处理作业。具体操作请参考取消批处理作业。 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#cancelSparkJob(CancelSparkJobRequest), * 注:作业状态为“已成功”或者“已失败”的批处理作业无法取消。 * 2. 如果您想根据jobId查询某个特定作业的详情,可执行以下方法。 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSparkJob(ShowSparkJobRequest), * 详见ShowSparkJob */ } catch (DLIException e) { // 请根据业务实际情况处理异常信息,此处仅作样例。 } }
  • 操作前准备 获取AK/SK,项目ID及对应的Region信息。 管理控制台。 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
  • 操作前准备 获取AK/SK,项目ID及对应的Region信息。 管理控制台。 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
  • 创建Flink Jar作业 功能介绍: 用于创建Flink Jar作业。 相关链接: 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createFlinkJarJob(com.huaweicloud.sdk.dli.v1.model.CreateFlinkJarJobRequest) 新建Flink Jar作业 创建 Flink Jar作业。作业状态将变成 “草稿” 示例代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static Long createFlinkJarJob(DliClient client, String queueName) { // 请根据业务实际情况设置相应的参数,此处仅作样例。 CreateFlinkJarJobResponse resp = client.createFlinkJarJob(new CreateFlinkJarJobRequest() .withBody(new CreateFlinkJarJobRequestBody() .withName("demo_flink_jar") // 自定义作业名称。长度限制:1-57个字符。 .withDesc("YourJobDescription") // 自定义作业描述。长度限制:0-512个字符 .withQueueName(queueName) // 队列名称。长度限制:0-128个字符 .withFeature("basic") // 作业特性。表示用户作业使用的Flink镜像类型。basic:表示使用DLI提供的基础Flink镜像。 .withFlinkVersion("1.12") // Flink版本。当用户设置“feature”为“basic”时,该参数生效 .withObsBucket("YourObsBucketName") // OBS桶名。用于保存 日志 和 checkpoint数据 .withLogEnabled(true) // 开启作业的日志上传到用户的OBS功能 .withEntrypoint("obs://YourObsBucketName/your/flink/job.jar") // 用户已上传到OBS的jar包,用户自定义作业主类所在的jar包。 .withMainClass("YourClassFullName") // 作业入口类,比如:org.apache.flink.examples.JavaQueueStream .withEntrypointArgs("YourAppArg1 YourAppAgr2") // 作业入口类参数,多个参数之间空格分隔。如不需要,删除此行 .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.jar", "obs://YourObsBucketName/your/dependency2.jar")) // 用户已上传到OBS的jar包,用户自定义作业的其他依赖包。如不需要,删除此行 .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.csv", "obs://YourObsBucketName/your/dependency2.json")) // 用户已上传到OBS的文件,用户自定义作业的依赖文件。如不需要,删除此行 )); return resp.getJob().getJobId(); }
  • 查询作业状态 功能介绍: 查询Flink SQL作业状态。 相关链接: 关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showFlinkJob(com.huaweicloud.sdk.dli.v1.model.ShowFlinkJobRequest) 查询Flink作业详情。 如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。 示例代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static void checkRunning(DliClient client, Long jobId) throws DLIException { while (true) { ShowFlinkJobResponse resp; try { resp = client.showFlinkJob(new ShowFlinkJobRequest().withJobId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get Flink jar job status by id: " + jobId, e); } String status = resp.getJobDetail().getStatus(); logger.info(String.format("FlinkJarJob id %s status: %s", jobId, status)); if ("job_running".equals(status)) { return; } if ("job_submit_fail".equals(status) || "job_running_exception".equals(status)) { throw new DLIException("Run Flink jar job failed: " + resp); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }
  • 创建Flink SQL作业 功能介绍: 用于创建Flink SQL作业。 相关链接: 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createFlinkSqlJob(com.huaweicloud.sdk.dli.v1.model.CreateFlinkSqlJobRequest) 新建Flink SQL作业。 创建 Flink SQL作业,此时作业状态将变成 “草稿”。 示例代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private static Long createFlinkSqlJob(DliClient client, String queueName) { // 请根据业务实际情况设置相应的参数,此处仅作样例。 CreateFlinkSqlJobResponse resp = client.createFlinkSqlJob(new CreateFlinkSqlJobRequest() .withBody(new CreateFlinkSqlJobRequestBody() .withName("demo_flink_sql") // 自定义作业名称。长度限制:1-57个字符 .withDesc("YourJobDescription") // 自定义作业描述。长度限制:0-512个字符 .withSqlBody("create table orders(\n" + " name string,\n" + " num INT\n" + ") with (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1', \n" + " 'fields.name.kind' = 'random', \n" + " 'fields.name.length' = '5' \n" + ");\n" + "\n" + "CREATE TABLE sink_table (\n" + " name string,\n" + " num INT\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ");\n" + "\n" + "INSERT into sink_table SELECT * from orders;") // 自定义 Stream SQL语句,至少包含source, query, sink三个部分。长度限制:1024*1024个字符。 // 本SQL示例:自动生成随机source数据,并打印到控制台。 .withQueueName(queueName) // 队列名称。长度限制:0-128个字符 .withRunMode("exclusive_cluster") // 作业运行模式。只支持 exclusive_cluster 独享模式。 .withLogEnabled(true) // 开启作业的日志上传到用户的OBS功能 .withObsBucket("YourObsBucketName") // OBS桶名。用于保存 日志 和 checkpoint数据 .withJobType("flink_opensource_sql_job") // 作业类型。建议选择: "flink_opensource_sql_job" .withFlinkVersion("1.12") // 指定Flink版本 )); return resp.getJob().getJobId(); }
  • 查询作业状态 功能介绍: 查询Flink SQL作业状态。 相关链接: 关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showFlinkJob(com.huaweicloud.sdk.dli.v1.model.ShowFlinkJobRequest)} 查询Flink作业详情 如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。 示例代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static void checkRunning(DliClient client, Long jobId) throws DLIException { while (true) { ShowFlinkJobResponse resp; try { resp = client.showFlinkJob(new ShowFlinkJobRequest().withJobId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get Flink sql job status by id: " + jobId, e); } String status = resp.getJobDetail().getStatus(); logger.info(String.format("FlinkSqlJob id %s status: %s", jobId, status)); if ("job_running".equals(status)) { return; } if ("job_submit_fail".equals(status) || "job_running_exception".equals(status)) { throw new DLIException("Run Flink sql job failed: " + resp); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }
  • 操作前准备 获取AK/SK,项目ID及对应的Region信息。 管理控制台。 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
  • 导出数据 DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 def export_data(dli_client, db_name, tbl_name, queue_name): try: job_id, status = dli_client.export_table(tbl_name, db_name, 'obs://bucket/obj', queue_name=queue_name) except DliException as e: print(e) return print(job_id) print(status) 在提交导出作业前,可选设置数据格式、压缩类型、导出模式等,导出格式只支持csv格式。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • 查询所有作业 DLI提供查询所有作业的接口。您可以使用该接口执行查询当前工程下的所有作业的信息并获取查询结果。示例代码如下: 1 2 3 4 5 6 7 8 def list_all_sql_jobs(dli_client): try: sql_jobs = dli_client.list_sql_jobs() except DliException as e: print(e) return for sql_job in sql_jobs: print(sql_job) 该SDK接口不支持sql_pattern,即通过指定sql片段作为作业过滤条件进行查询。 如果需要则可以通过查询所有作业API接口指定该参数进行查询。
  • 查询SQL类型作业 您可以使用该接口查询当前工程下的所有SQL类型作业的信息并获取查询结果。示例代码如下: def list_sql_jobs(dli_client): try: sql_jobs = dli_client.list_sql_jobs() except DliException as e: print(e) return
  • 提交作业 DLI提供查询作业的接口。您可以使用该接口执行查询并获取查询结果。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def run_sql(dli_client, db_name, queue_name): # execute SQL try: sql_job = dli_client.execute_sql('select * from tbl_dli_for_test', db_name, queue_name=queue_name) result_set = sql_job.get_result(queue_name=queue_name) except DliException as e: print(e) return if result_set.row_count == 0: return for row in result_set: print(row) # export the query result to obs try: status = sql_job.export_result('obs://bucket/obj', queue_name=queue_name) except DliException as e: print(e) return print(status)
  • 取消作业 DLI提供取消作业的接口。您可以使用该接口取消已经提交的作业,若作业已经执行结束或失败则无法取消。示例代码如下: 1 2 3 4 5 6 def cancel_sql(dli_client, job_id): try: dli_client.cancel_sql(job_id) except DliException as e: print(e) return
  • 导入数据 DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def import_data(dli_client, db_name, tbl_name, queue_name): options = { "with_column_header": True, "delimiter": ",", "quote_char": "\"", "escape_char": "\\", "date_format": "yyyy/MM/dd", "timestamp_format": "yyyy/MM/dd hh:mm:ss" } try: job_id, status = \ dli_client.import_table(tbl_name, db_name, 'obs://bucket/obj/data.csv', 'csv', queue_name=queue_name, options=options) except DliException as e: print(e) return print(job_id) print(status) 在提交导入作业前,可选择通过data_type参数设置导入数据的类型,例如将data_type设置为csv。csv数据的具体格式通可过options参数设置,例如:csv的分隔符,转义符等。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • 描述表信息 您可以使用该接口获取表的元数据描述信息。示例代码如下: def get_table_schema(dli_client, db_name, tbl_name): try: table_info = dli_client.get_table_schema(db_name, tbl_name) print(table_info) except DliException as e: print(e) return
  • 创建OBS表 DLI提供创建OBS表的接口。您可以使用该接口创建数据存储在OBS的表。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def create_obs_tbl(dli_client, db_name, tbl_name): cols = [ Column('col_1', 'string'), Column('col_2', 'string'), Column('col_3', 'smallint'), Column('col_4', 'int'), Column('col_5', 'bigint'), Column('col_6', 'double'), Column('col_7', 'decimal(10,0)'), Column('col_8', 'boolean'), Column('col_9', 'date'), Column('col_10', 'timestamp') ] tbl_schema = TableSchema(tbl_name, cols) try: table = dli_client.create_obs_table(db_name, tbl_schema, 'obs://bucket/obj', 'csv') except DliException as e: print(e) return print(table) 创建OBS表需要指定OBS路径,且该路径需要提前创建。 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 删除表 DLI提供删除表的接口。您可以使用该接口删除数据库下的所有表。示例代码如下: 1 2 3 4 5 6 7 8 def delete_tbls(dli_client, db_name): try: tbls = dli_client.list_tables(db_name) for tbl in tbls: dli_client.delete_table(db_name, tbl.name) except DliException as e: print(e) return 表删除后,将不可恢复,请谨慎操作。 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 查询所有表 DLI提供查询表的接口。您可以使用该接口查询数据库下的所有表。示例代码如下: 1 2 3 4 5 6 7 8 9 def list_all_tbls(dli_client, db_name): try: tbls = dli_client.list_tables(db_name, with_detail=True) except DliException as e: print(e) return for tbl in tbls: print(tbl.name) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 创建DLI表 DLI提供创建DLI表的接口。您可以使用该接口创建数据存储在DLI内部的表。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def create_dli_tbl(dli_client, db_name, tbl_name): cols = [ Column('col_1', 'string'), Column('col_2', 'string'), Column('col_3', 'smallint'), Column('col_4', 'int'), Column('col_5', 'bigint'), Column('col_6', 'double'), Column('col_7', 'decimal(10,0)'), Column('col_8', 'boolean'), Column('col_9', 'date'), Column('col_10', 'timestamp') ] sort_cols = ['col_1'] tbl_schema = TableSchema(tbl_name, cols, sort_cols) try: table = dli_client.create_dli_table(db_name, tbl_schema) except DliException as e: print(e) return print(table) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 查询所有数据库 DLI提供查询数据库列表接口。您可以使用该接口查询当前已创建的数据库列表。示例代码如下: 1 2 3 4 5 6 7 8 9 def list_all_dbs(dli_client): try: dbs = dli_client.list_databases() except DliException as e: print(e) return for db in dbs: print(db) 完整样例代码和依赖包说明请参考:Python SDK概述。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全