云服务器内容精选

  • 请求示例 提交SQL作业,该作业执行的数据库为db1、队列为default,并为该作业设置标签workspace=space1;jobName=name1。 { "currentdb": "db1", "sql": "desc table1", "queue_name": "default", "conf": [ "dli.sql.shuffle.partitions = 200" ], "tags": [ { "key": "workspace", "value": "space1" }, { "key": "jobName", "value": "name1" } ] }
  • 响应示例 { "is_success": true, "message": "", "job_id": "8ecb0777-9c70-4529-9935-29ea0946039c", "job_type": "DDL", "job_mode":"sync", "schema": [ { "col_name": "string" }, { "data_type": "string" }, { "comment": "string" } ], "rows": [ [ "c1", "int", null ], [ "c2", "string", null ] ] }
  • 请求消息 表2 请求参数 参数名称 是否必选 参数类型 说明 sql 是 String 待执行的SQL语句。 currentdb 否 String SQL语句执行所在的数据库。当创建新数据库时,不需要提供此参数。 current_catalog 否 String 待提交作业的表的默认catalog。不填时默认使用 DLI catalog。 queue_name 否 String 待提交作业的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。 conf 否 Array of Strings 用户以“key/value”的形式设置用于此作业的配置参数。目前支持的配置项请参考表3。 tags 否 Array of Objects 作业的标签。具体请参考表4。 engine_type 否 String 选择执行作业的引擎类型。 表3 conf参数说明 参数名称 默认值 描述 spark.sql.files.maxRecordsPerFile 0 要写入单个文件的最大记录数。如果该值为零或为负,则没有限制。 spark.sql.autoBroadcastJoinThreshold 209715200 配置执行连接时显示所有工作节点的表的最大字节大小。通过将此值设置为“-1”,可以禁用显示。 说明: 当前仅支持运行命令ANALYZE TABLE COMPUTE statistics noscan的配置单元元存储表,和直接根据数据文件计算统计信息的基于文件的数据源表。 spark.sql.shuffle.partitions 200 为连接或聚合过滤数据时使用的默认分区数。 spark.sql.dynamicPartitionOverwrite.enabled false 当前配置设置为“false”时,DLI在覆盖写之前,会删除所有符合条件的分区。例如,分区表中有一个“2021-01”的分区,当使用INSERT OVERWRITE语句向表中写入“2021-02”这个分区的数据时,会把“2021-01”的分区数据也覆盖掉。 当前配置设置为“true”时,DLI不会提前删除分区,而是在运行时覆盖那些有数据写入的分区。 spark.sql.files.maxPartitionBytes 134217728 读取文件时要打包到单个分区中的最大字节数。 spark.sql.badRecordsPath - Bad Records的路径。 spark.sql.legacy.correlated.scalar.query.enabled false 该参数设置为true: 当子查询中数据不重复的情况下,执行关联子查询,不需要对子查询的结果去重。 当子查询中数据重复的情况下,执行关联子查询,会提示异常,必须对子查询的结果做去重处理,比如max(),min()。 该参数设置为false: 不管子查询中数据重复与否,执行关联子查询时,都需要对子查询的结果去重,比如max(),min(),否则提示异常。 dli.jobs.sql.resubmit.enable null 通过设置该参数可以控制在driver故障、队列重启时Spark SQL作业是否重新提交。 false:禁用作业重试,所有类型的命令都不重新提交,一旦driver故障,作业将标记为失败(FAILED)。 true:启用作业重试,即在driver故障时,所有类型的作业都将重新提交。 注意: 如果配置为true,在执行INSERT等幂等类型的操作时(例如insert into,load data、update),可能会导致数据一致性问题。即driver故障后作业重试,导致driver故障前已插入的数据被重复写入。 spark.sql.optimizer.dynamicPartitionPruning.enabled true 该配置项用于启用或禁用动态分区修剪。在执行SQL查询时,动态分区修剪可以帮助减少需要扫描的数据量,提高查询性能。 配置为true时,代表启用动态分区修剪,SQL会在查询中自动检测并删除那些不满足WHERE子句条件的分区,适用于在处理具有大量分区的表时。 如果SQL查询中包含大量的嵌套left join操作,并且表有大量的动态分区时,这可能会导致在数据解析时消耗大量的内存资源,导致Driver节点的内存不足,并触发频繁的Full GC。 在这种情况下,可以配置该参数为false即禁用动态分区修剪优化,有助于减少内存使用,避免内存溢出和频繁的Full GC。 但禁用此优化可能会降低查询性能,禁用后Spark将不会自动修剪掉那些不满足条件的分区。 表4 tags参数 参数名称 是否必选 参数类型 说明 key 是 String 标签的键。 说明: 标签的键的最大长度为128个字符,标签的键可以包含任意语种字母、数字、空格和_ . : =+-@ ,但首尾不能含有空格,不能以_sys_开头。 value 是 String 说明: 标签值的最大长度为255个字符,标签的值可以包含任意语种字母、数字、空格和_ . : =+-@ ,但首尾不能含有空格。
  • 响应消息 表5 响应参数 参数名称 是否必选 参数类型 说明 is_success 是 Boolean 请求发送是否成功。“true”表示请求发送成功。 message 是 String 系统提示信息,执行成功时,信息可能为空。 job_id 是 String 此SQL语句将生成并提交一个新作业,返回此作业的ID,可用于获取作业状态和作业结果。 job_type 是 String 作业类型。 DDL DCL IMPORT EXPORT QUERY INSERT schema 否 Array of Map 当语句类型为DDL时,返回其结果的列名称及类型。 rows 否 Array of objects 当语句类型为DDL,且dli.sql.sqlasync.enabled=false时,直接返回其执行结果。但是最多只能返回1000行。 如果超过1000行,请通过异步方式获取结果。即,提交作业时配置 xxxx = true, 然后从DLI配置的作业桶中获取结果。结果在作业桶上的路径可以通过ShowSqlJobStatus接口返回值中的result_path来获取。结果的全量数据会自动导出到作业桶。 job_mode 否 String 作业执行模式: async:异步 sync:同步
  • 功能介绍 该API用于通过执行SQL语句的方式向队列提交作业。 作业包含以下类型:DDL、DCL、IMPORT、QUERY和INSERT。其中,IMPORT与导入数据(废弃)的功能一致,区别仅在于实现方式不同。 另外,用户可使用其他API来对作业进行查询和管理。具体操作有: 查询作业状态 查询作业详细信息 查询作业结果-方式二(废弃) 导出查询结果 查询所有作业 取消作业(推荐) 该API当响应消息中“job_type”为“DCL”时,为同步操作。
  • 响应消息 表2 响应参数 参数名称 参数类型 说明 is_success Boolean 执行请求是否成功。“true”表示请求执行成功。 message String 系统提示信息,执行成功时,该值为空。 job_id String 作业ID。可通过提交SQL作业(推荐)获取。 job_type String 作业类型。包含DDL、DCL、IMPORT、EXPORT、QUERY、INSERT、DATA_MIGRATION、UPDATE、DELETE、RESTART_QUEUE、SCALE_QUEUE。 job_mode String 作业执行模式: async:异步 sync:同步 queue_name String 队列名称,用于显示作业是在该队列中提交的。 owner String 提交作业的用户。 start_time Long 作业开始的时间。是单位为“毫秒”的时间戳。 duration Long 作业运行时长,单位毫秒。 status String 此作业的当前状态,包含运行中(RUNNING)、规格变更中(SCALING)、提交中(LAUNCHING)、已完成(FINISHED)、已失败(FAILED)、已取消(CANCELLED)。 input_row_count Long Insert作业执行过程中扫描记录条数。 bad_row_count Long Insert作业执行过程中扫描到的错误记录数。 input_size Long 作业执行过程中扫描文件的大小,单位字节。 result_count Integer 当前作业返回的结果总条数或insert作业插入的总条数。 database_name String 记录其操作的表所在的数据库名称。类型为IMPORT、EXPORT和QUERY的作业才有“database_name”属性。 table_name String 记录其操作的表名称。类型为IMPORT、EXPORT和QUERY的作业才有“table_name”属性。 detail String 相关列信息的Json字符串。 statement String 作业执行的SQL语句。 tags Array of objects 作业的标签。具体请参考表3。 user_conf String SQL查询的相关列信息的Json字符串。 result_format String 作业结果的存储格式,当前只支持csv。 result_path String 作业结果的OBS路径。 表3 tags参数 参数名称 是否必选 参数类型 说明 key 是 String 标签的键。 value 是 String 标签的值。
  • 响应示例 { "is_success": true, "message": "", "job_id": "208b08d4-0dc2-4dd7-8879-ddd4c020d7aa", "job_type": "QUERY", "job_mode":"async", "queue_name": "default", "owner": "test", "start_time": 1509335108918, "duration": 2523, "status": "FINISHED", "input_size": 22, "result_count": 4, "database_name":"dbtest", "table_name":"tbtest", "detail": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", "statement": "select * from t1" }
  • 响应消息 表3 响应参数 参数 是否必选 参数类型 说明 is_success 是 Boolean 当“job_type”为“DCL”时,为请求执行是否成功。“true”表示请求执行成功。 message 是 String 系统提示信息,执行成功时,信息可能为空。 job_id 是 String 此SQL语句将生成并提交一个新作业,返回此作业的ID,可用于获取作业状态和作业结果。 job_type 是 String 作业类型。 DDL DCL IMPORT EXPORT QUERY INSERT schema 否 Array of Map 当语句类型为DDL时,返回其结果的列名称及类型。 rows 否 Array of objects 当语句类型为DDL时,直接返回其执行结果。
  • 示例 请求样例: { "currentdb": "db1", "sql": "desc table1", "conf": [ "dli.sql.shuffle.partitions = 200" ] } 成功响应样例: { "is_success": true, "message": "", "job_id": "8ecb0777-9c70-4529-9935-29ea0946039c", "job_type": "DDL", "schema": [ { "col_name": "string" }, { "data_type": "string" }, { "comment": "string" } ], "rows": [ [ "c1", "int", null ], [ "c2", "string", null ] ] } 调用API出错后,将不会返回上述结果,而是返回错误码和错误信息,详细介绍请参见错误码。
  • 请求消息 表2 请求参数 参数 是否必选 参数类型 说明 sql 是 String 待执行的SQL语句。 currentdb 否 String SQL语句执行所在的数据库。当创建新数据库时,不需要提供此参数。 conf 否 Array of objects 用户定义适用于此作业的配置参数。目前支持的配置项: dli.sql.join.preferSortMergeJoin(是否优先使用SortMergeJoin) dli.sql.autoBroadcastJoinThreshold(自动使用BroadcastJoin的数据量阈值) dli.sql.caseSensitive(sql语句是否大小写敏感) dli.sql.shuffle.partitions(指定Shuffle过程中Partition的个数) dli.sql.cbo.enabled(是否打开CBO优化策略) dli.sql.cbo.joinReorder.enabled(开启CBO优化时,是否允许重新调整join的顺序)
  • 功能介绍 该API用于通过执行SQL语句的方式向队列提交作业。 当前接口已废弃,不推荐使用。 作业包含以下类型:DDL、DCL、IMPORT、EXPORT、QUERY和INSERT。其中,IMPORT和EXPORT分别与导入数据(废弃)和与导出数据(废弃)的功能一致,区别仅在于实现方式不同。 另外,用户可使用其他API来对作业进行查询和管理。具体操作有: 查询作业状态 查询作业详细信息 查询作业结果-方式一(废弃) 导出查询结果 查询所有作业 取消作业(废弃) 该API当响应消息中“job_type”为“DCL”时,为同步操作。 本章节介绍的API已过时,推荐使用提交SQL作业(推荐)介绍的API。
  • 删除数据库 DLI提供删除数据库的接口。您可以使用该接口删除数据库。示例代码如下: 1 2 3 4 5 6 7 8 //调用Database对象的deleteDatabase接口删除数据库, //其中Database对象通过调用对象DLIClient的getDatabase(String databaseName)接口获得. private static void deletedatabase(Database database) throws DLIException { String dbName = "databasename"; database=client.getDatabase(dbName); database.deleteDatabase(); System.out.println("delete db " + dbName); } 含表的数据库不能直接删除,请先删除数据库的表再删除数据库。 数据库删除后,将不可恢复,请谨慎操作。
  • 创建数据库 DLI提供创建数据库的接口。您可以使用该接口创建数据库,示例代码如下: 1 2 3 4 5 6 7 private static Database createDatabase(DLIClient client) throws DLIException { //通过调用DLIClient对象的createDatabase方法创建数据库 String dbName = "databasename"; Database database = client.createDatabase(dbName); System.out.println("create database:" + database); return database; } “default”为内置数据库,不能创建名为“default”的数据库。
  • OBS表如何映射为DLI的分区表? 该示例将car_info数据,以day字段为分区字段,parquet为编码格式(目前仅支持parquet格式),转储数据到OBS。更多内容请参考《 数据湖探索 Flink SQL语法参考》。 1 2 3 4 5 6 7 8 9 10 11 12 13 create sink stream car_infos ( carId string, carOwner string, average_speed double, day string ) partitioned by (day) with ( type = "filesystem", file.path = "obs://obs-sink/car_infos", encode = "parquet", ak = "{{myAk}}", sk = "{{mySk}}" ); 数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/day=xx/part-x-x。 数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理: 创建OBS分区表。 1 2 3 4 5 6 7 8 create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (day string) stored as parquet location 'obs://obs-sink/car-infos'; 从关联OBS路径中恢复分区信息。 1 alter table car_infos recover partitions; 父主题: Flink SQL作业相关问题