华为云用户手册

  • 问题现象 在 DataArts Studio 上运行 DLI SQL脚本,执行结果的运行日志显示语句执行失败,错误信息为: DLI.0999: RuntimeException: org.apache.hadoop.fs.obs.OBSIOException: initializing on obs://xxx.csv: status [-1] - request id [null] - error code [null] - error message [null] - trace :com.obs.services.exception.ObsException: OBS servcie Error Message. Request Error: ... Cause by: ObsException: com.obs.services.exception.ObsException: OBSs servcie Error Message. Request Error: java.net.UnknownHostException: xxx: Name or service not known
  • 问题现象 在 CDM 迁移数据到DLI,迁移作业提交后,在CDM作业迁移日志中查看作业执行失败,具体日志有如下报错信息: org.apache.sqoop.common.SqoopException: UQUERY_CONNECTOR_0001:Invoke DLI service api failed, failed reason is %s. at org.apache.sqoop.connector.uquery.intf.impl.UQueryWriter.close(UQueryWriter.java:42) at org.apache.sqoop.connector.uquery.processor.Dataconsumer.run(Dataconsumer.java:217) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
  • 问题描述 在Spark SQL作业中,使用LOAD DATA命令导入数据到DLI表中时报如下错误: error.DLI.0001: IllegalArgumentException: Buffer size too small. size = 262144 needed = 2272881 或者如下错误 error.DLI.0999: InvalidProtocolBufferException: EOF in compressed stream footer position: 3 length: 479 range: 0 offset: 3 limit: 479 range 0 = 0 to 479 while trying to read 143805 bytes
  • 问题现象 使用Flink作业输出流写入数据到了OBS中,通过该OBS文件路径创建的DLI表进行数据查询时,无法查询到数据。 例如,使用如下Flink结果表将数据写入到OBS的“obs://obs-sink/car_infos”路径下。 create sink stream car_infos_sink ( carId string, carOwner string, average_speed double, buyday string ) partitioned by (buyday) with ( type = "filesystem", file.path = "obs://obs-sink/car_infos", encode = "parquet", ak = "{{myAk}}", sk = "{{mySk}}" ); 通过该OBS文件路径创建DLI分区表,在DLI查询car_infos表数据时没有查询到数据。 create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (buyday string) stored as parquet location 'obs://obs-sink/car_infos';
  • 问题现象 Flink SQL作业创建表时,表名使用EL表达式,运行作业时报如下错误: DLI.0005: AnalysisException: t_user_message_input_#{date_format(date_sub(current_date(), 1), 'yyyymmddhhmmss')} is not a valid name for tables/databases. Valid names only contain alphabet characters, numbers and _.
  • 解决方案 需要将SQL中表名的“#”字符改成“$”即可。DLI中使用EL表达式的格式为:${expr}。 修改前: t_user_message_input_#{date_format(date_sub(current_date(), 1), 'yyyymmddhhmmss')} 修改后: t_user_message_input_${date_format(date_sub(current_date(), 1), 'yyyymmddhhmmss')} 修改后,Flink SQL作业能够正确解析表名,并根据EL表达式动态生成表名。
  • 解决方案 出现该问题时建议通过以下操作步骤解决。 登录DWS命令执行窗口,执行以下SQL命令,临时将所有non-active的连接释放掉。 SELECT PG_TERMINATE_BACKEND(pid) from pg_stat_activity WHERE state='idle'; 检查应用程序是否未主动释放连接,导致连接残留。建议优化代码,合理释放连接。 在 GaussDB (DWS) 控制台设置会话闲置超时时长session_timeout,在闲置会话超过所设定的时间后服务端将主动关闭连接。 session_timeout默认值为600秒,设置为0表示关闭超时限制,一般不建议设置为0。 session_timeout设置方法如下: 登录GaussDB(DWS) 管理控制台。 在左侧导航栏中,单击“集群管理”。 在集群列表中找到所需要的集群,单击集群名称,进入集群“基本信息”页面。 单击“参数修改”页签,修改参数“session_timeout”,然后单击“保存”。 在“修改预览”窗口,确认修改无误后,单击“保存”。 更多问题处理步骤,请参考DWS数据库连接问题。
  • Spark作业运行大批量数据时上报作业运行超时异常错误 当Spark作业运行大批量数据时,如果出现作业运行超时异常错误,通常是由于作业的资源配置不足、数据倾斜、网络问题或任务过多导致的。 解决方案: 设置并发数:通过设置合适的并发数,可以启动多任务并行运行,从而提高作业的处理能力。 例如访问DWS大批量数据库数据时设置并发数,启动多任务的方式运行,避免作业运行超时。 具体并发设置可以参考对接DWS样例代码中的partitionColumn和numPartitions相关字段和案例描述。 调整Spark作业的Executor数量,分配更多的资源用于Spark作业的运行。 父主题: Spark作业运维类
  • 解决方案 为了解决因IP地址不足导致的Flink作业提交失败问题,可以在作业优化参数中添加以下配置: 启用 HostNetwork。 kubernetes.dli.hostnetwork.enabled=true kubernetes.hostnetwork.enabled=true 调整Leader Election 配置: high-availability.kubernetes.leader-election.renew-deadline=15s high-availability.kubernetes.leader-election.lease-duration=15s
  • 应用场景 本文列举了一些常见的从checkpoint恢复的场景供您参考,如表1所示。 更多场景你可以使用从checkpoint恢复的原则结合实际情况进行判断。 表1 从checkpoint恢复的常见场景 场景 是否支持恢复 说明 调整或者增加并行数 不支持 该操作修改了作业的并行数,即修改了作业的运行逻辑。 修改Flink SQL语句、Flink Jar作业等操作 不支持 该操作修改了作业对资源的算法逻辑。 例如原有的算法的语句是执行加减运算,当前需要恢复的状态将算法的语句修改成为乘除取余的运算,是无法从checkpoint直接恢复的。 修改“静态流图” 不支持 该操作修改了作业对资源的算法逻辑。 修改“单TM所占CU数”参数 支持 对计算资源的修改并没有影响到作业算法或算子的运行逻辑。 作业运行异常或物理停电 支持 当对作业参数未作出修改。
  • DLI是否支持导入其他租户共享OBS桶的数据? DLI支持将同一个租户下子账户共享OBS桶中的数据导入,但是租户级别共享OBS桶中的数据无法导入。 DLI不支持导入其他租户共享的OBS桶中的数据,主要是为了确保数据的安全性和数据隔离。 对于需要跨租户共享和分析数据的场景,建议先将数据脱敏后上传到OBS桶中,再进行数据分析,分析完成后及时删除OBS桶中的临时数据,以确保数据安全 父主题: DLI产品咨询类
  • 为什么DLI增强型跨源连接要创建对等连接? DLI增强型跨源连接创建对等连接的主要原因是为了实现DLI与不同VPC中的数据源之间的网络连通。 当DLI需要访问外部数据源,而这些数据源位于不同的VPC中时,由于网络隔离,DLI默认无法直接读取这些数据源的数据。通过创建增强型跨源连接,可以采用对等连接的方式打通DLI与数据源的VPC网络,从而实现数据的互通和跨源分析。 增强型跨源连接的优势: 网络连通性:直接打通DLI与目的数据源的VPC网络实现数据互通。 支持多种数据源:支持DLI与多种数据源的网络连通,例如DWS,RDS, CSS ,D CS 等数据源。 父主题: 增强型跨源连接类
  • 怎样管理在DLI上运行的作业 管理大量的DLI作业时您可以采用以下方案: 作业分组: 将几万个作业根据不同的类型分组,不同类型的作业通过不同的队列运行。 创建 IAM 子用户 或者创建IAM子用户,将不同类型的作业通过不同的用户执行。 具体请参考《 数据湖探索 用户指南》。 此外DLI还提供了作业管理功能,包括编辑、启动、停止、删除作业,以及导出和导入作业。您可以利用这些功能来定期维护和管理作业。 父主题: DLI产品咨询类
  • 怎样获取DLI作业样例(Demo) 为了方便用户更好地使用DLI,DLI服务提供了供作业开发的Demo样例,您可以通过DLI样例代码获取。 该样例代码的目录内容介绍如下: dli-flink-demo:开发Flink作业时的样例代码参考。例如,样例代码实现读取Kafka源表数据写入到HDFS、DWS、Hive等结果表中的功能。 dli-spark-demo:开发Spark作业时的样例代码参考。具体如下: “dli-spark-demo-obs”读取和写入OBS数据。 “dli-spark-demo-redis”读取和写入Redis数据。 dli-pyspark-demo:使用Python语言开发Spark作业。例如,样例包中的样例代码实现创建Redis表的功能。 父主题: DLI产品咨询类
  • 解决方案 在SQL语句中配置发送失败重试:connector.properties.retries=5 create table kafka_sink( car_type string , car_name string , primary key (union_id) not enforced ) with ( "connector.type" = "upsert-kafka", "connector.version" = "0.11", "connector.properties.bootstrap.servers" = "xxxx:9092", "connector.topic" = "kafka_car_topic ", "connector.sink.ignore-retraction" = "true", "connector.properties.retries" = "5", "format.type" = "json" );
  • DLI中的Spark组件与 MRS 中的Spark组件有什么区别? DLI和MRS都支持Spark组件,但在服务模式、接口方式、应用场景和性能特性上存在一些差异。 DLI服务的Spark组件是全托管式服务,用户对Spark组件不感知,仅仅可以使用该服务,且接口为封装式接口。 DLI的这种模式减轻了运维负担,可以更专注于数据处理和分析任务本身。 具体请参考《 数据湖 探索用户指南》。 MRS服务Spark组件的是建立在客户的购买MRS服务所分配的虚机上,用户可以根据实际需求调整及优化Spark服务,支持各种接口调用。 MRS的这种模式提供了更高的自由度和定制性,适合有大数据处理经验的用户使用。 具体请参考《 MapReduce服务 开发指南》。 父主题: DLI产品咨询类
  • 问题现象 跨源连接创建对等连接失败,报错信息如下: Failed to get subnet 2c2bd2ed-7296-4c64-9b60-ca25b5eee8fe. Response code : 404, message : {"code":"VPC.0202","message":"Query resource by id 2c2bd2ed-7296-4c64-9b60-ca25b5eee8fe fail.the subnet could not be found."}
  • 关联OBS桶中嵌套的JSON格式数据如何创建表 如果需要关联OBS桶中嵌套的JSON格式数据,可以使用异步模式创建表。 以下是一个示例的建表语句,展示了如何使用 JSON 格式选项来指定 OBS 中的路径: create table tb1 using json options(path 'obs://....') using json:指定使用 JSON 格式。 options:用于设置表的选项。 path:指定OBS中JSON文件的路径。 父主题: SQL作业开发类
  • Spark如何将数据写入到DLI表中 使用Spark将数据写入到DLI表中,主要设置如下参数: fs.obs.access.key fs.obs.secret.key fs.obs.impl fs.obs.endpoint 示例如下: import logging from operator import add from pyspark import SparkContext logging.basicConfig(format='%(message)s', level=logging.INFO) #import local file test_file_name = "D://test-data_1.txt" out_file_name = "D://test-data_result_1" sc = SparkContext("local","wordcount app") sc._jsc.hadoopConfiguration().set("fs.obs.access.key", "myak") sc._jsc.hadoopConfiguration().set("fs.obs.secret.key", "mysk") sc._jsc.hadoopConfiguration().set("fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem") sc._jsc.hadoopConfiguration().set("fs.obs.endpoint", "myendpoint") # red: text_file rdd object text_file = sc.textFile(test_file_name) # counts counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) # write counts.saveAsTextFile(out_file_name) 父主题: Spark作业开发类
  • 性能调优 rocksdb状态调优 topN排序、窗口聚合计算以及流流join等都涉及大量的状态操作,因而如果发现这类算子存在性能瓶颈,可以尝试优化状态操作的性能。主要可以尝试通过如下方式优化: 增加状态操作内存,降低磁盘IO 增加单slot cu资源数 配置优化参数: taskmanager.memory.managed.fraction=xx state.backend.rocksdb.block.cache-size=xx state.backend.rocksdb.writebuffer.size=xx 开启微批模式,避免状态频繁操作 配置参数: table.exec.mini-batch.enabled=true table.exec.mini-batch.allow-latency=xx table.exec.mini-batch.size=xx 使用超高IO本地盘规格机型,加速磁盘操作 group agg单点及数据倾斜调优 按天聚合计算或者group by key不均衡场景下,group聚合计算存在单点或者数据倾斜问题,此时,可以通过将聚合计算拆分成Local-Global进行优化。配置方式为设置调优参数: table.optimizer.aggphase-strategy=TWO_PHASE count distinct优化 在count distinct关联key比较稀疏场景下,即使使用Local-Global,单点问题依然非常严重,此时可以通过配置以下调优参数进行分桶拆分优化: table.optimizer.distinct-agg.split.enabled=true table.optimizer.distinct-agg.split.bucket-num=xx 使用filter替换case when: 例如: COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone')THEN user_id ELSE NULL END) AS app_uv 可调整为 COUNT(DISTINCT user_id) FILTER(WHERE flag IN ('android', 'iphone')) AS app_uv 维表join优化 维表join根据左表进入的每条记录join关联键,先在缓存中匹配,如果匹配不到,则从远程拉取。因而,可以通过如下方式优化: 增加JVM内存并增加缓存记录条数 维表设置索引,加快查询速度
  • 性能分析 由于Flink的反压机制,流作业在存在性能问题的情况下,会导致数据源消费速率跟不上生产速率,从而引起Kafka消费组的积压。在这种情况下,可以通过算子的反压和时延,确定算子的性能瓶颈点。 作业最后一个算子(Sink)反压正常(绿色),前面算子反压高(红色) 该场景说明性能瓶颈点在sink,此时需要根据具体数据源具体优化,比如对于JDBC数据源,可以通过调整写出批次(connector.write.flush.max-rows)、JDBC参数重写(rewriteBatchedStatements=true)等进行优化。 作业非倒数第二个算子反压高(红色) 该场景说明性能瓶颈点在Vertex2算子,可以通过查看该算子描述,确认该算子具体功能,以进行下一步优化。 所有算子反压都正常(绿色),但存在数据堆积 该场景说明性能瓶颈点在Source,主要是受数据读取速度影响,此时可以通过增加Kafka分区数并增加source并发解决。 作业一个算子反压高(红色),而其后续的多个并行算子都不存在反压(绿色) 该场景说明性能瓶颈在Vertex2或者Vertex3,为了进一步确定具体瓶颈点算子,可以在FlinkUI页面开启inPoolUsage监控。如果某个算子并发对应的inPoolUsage长时间为100%,则该算子大概率为性能瓶颈点,需分析该算子以进行下一步优化。 图3 inPoolUsage监控
  • Flink作业运行异常,如何定位 在“Flink作业”管理页面,对应作业“操作”列单击“编辑”按钮,在作业运行界面确认作业是否勾选“保存作业日志”参数。 图1 保存作业日志 是,则执行3。 否,则运行日志不会转储OBS桶,需要先执行2保存作业运行日志。 在作业运行界面勾选“保存作业日志”,在“OBS桶”参数选择存储运行日志的OBS桶。单击“启动”重新运行作业。作业重新运行完成后再执行3及后续步骤。 在Flink作业列表单击对应作业名称,进入作业详情页面,选择“运行日志”页签。 单击OBS桶,获取对应作业的完整运行日志。 图2 查看运行日志 下载最新“jobmanager.log”文件,搜索“RUNNING to FAILED”关键字,通过上下文的错误栈,确认失败原因。 如果“jobmanager.log”文件中的信息不足以定位,可以在运行日志中找到对应的“taskmanager.log”日志,搜索“RUNNING to FAILED”关键字,确认失败原因。 父主题: Flink作业性能调优类
  • Flink作业如何保存作业日志? 在创建Flink SQL作业或者Flink Jar作业时,可以在作业编辑页面,勾选“保存作业日志”参数,将作业运行时的日志信息保存到OBS。 勾选“保存作业日志”参数后,需配置“OBS桶”参数,选择OBS桶用于保存用户作业日志信息。如果选择的OBS桶是未授权状态,需要单击“OBS授权”。 日志信息的保存路径为:“桶名/jobs/logs/作业id开头的目录”。其中,“桶名”可以自定义。“/jobs/logs/作业id开头的目录”为固定格式。 在作业列表中,单击对应的作业名称,然后在“运行日志”页签,可以单击页面提供的OBS链接跳转至对应的路径下。 关于如何创建Flink SQL作业或者Flink Jar作业,请参考《数据湖探索用户指南》。 父主题: Flink作业咨询类
  • JOIN数据倾斜解决方案 登录数据湖探索管理控制台,选择“SQL作业”,在要修改的作业所在行的“操作”列,单击“编辑”进入SQL编辑器界面。 在SQL编辑器界面,单击“设置”,在“配置项”尝试添加以下几个Spark参数进行解决。 参数项如下,冒号前是配置项,冒号后是配置项的值。 spark.sql.enableToString:false spark.sql.adaptive.join.enabled:true spark.sql.adaptive.enabled:true spark.sql.adaptive.skewedJoin.enabled:true spark.sql.adaptive.enableToString:false spark.sql.adaptive.skewedPartitionMaxSplits:10 spark.sql.adaptive.skewedPartitionMaxSplits表示倾斜拆分力度,可不加,默认为5,最大为10。 单击“执行”重新运行作业,查看优化效果。
  • Group By数据倾斜解决方案 取部分数据执行select count(*) as sum,Key from tbl group by Key order by sum desc查询具体是哪些key引起的数据倾斜。 然后对于倾斜Key单独做处理,加盐让其先将他分为多个task分别统计,最后再对分开统计结果进行结合统计。 例如:如下SQL示例,假设已知倾斜key为'Key01',导致单个task处理大量数据,做如下处理: SELECT a.Key, SUM(a.sum) AS Cnt FROM ( SELECT Key, count(*) AS sum FROM tbl GROUP BY Key, CASE WHEN KEY = 'Key01' THEN floor(random () * 200) ELSE 0 END ) a GROUP BY a.Key;
  • 操作步骤 该示例将car_info数据,以day字段为分区字段,parquet为编码格式,转储数据到OBS。更多内容请参考《数据湖探索Flink SQL语法参考》。 1 2 3 4 5 6 7 8 9 10 11 12 13 create sink stream car_infos ( carId string, carOwner string, average_speed double, day string ) partitioned by (day) with ( type = "filesystem", file.path = "obs://obs-sink/car_infos", encode = "parquet", ak = "{{myAk}}", sk = "{{mySk}}" ); 数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/day=xx/part-x-x。 数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理: 创建OBS分区表。 1 2 3 4 5 6 7 8 create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (day string) stored as parquet location 'obs://obs-sink/car-infos'; 从关联OBS路径中恢复分区信息。 1 alter table car_infos recover partitions;
  • 如何合并小文件 使用SQL过程中,生成的小文件过多时,会导致作业执行时间过长,且查询对应表时耗时增大,建议对小文件进行合并。 设置配置项。 spark.sql.shuffle.partitions = 分区数量(即此场景下最终生成的文件数量) 执行SQL。 INSERT OVERWRITE TABLE tablename select * FROM tablename distribute by rand() 父主题: SQL作业开发类
  • DLI Flink作业支持哪些数据格式和数据源? DLI Flink作业支持如下数据格式: Avro,Avro_merge,BLOB,CSV,EMAIL,JSON,ORC,Parquet,XML。 DLI Flink作业支持如下数据源: CloudTable HBase,CloudTable OpenTSDB,CSS Elasticsearch,DCS,DDS,DIS,DMS,DWS,EdgeHub,MRS HBase,MRS Kafka,开源Kafka,文件系统,OBS,RDS, SMN 表1 数据格式和支持的输入输出流 数据格式 支持的输入流 支持的输出流 Avro - OBS输出流 Avro_merge - OBS输出流 BLOB DIS输入流 MRS Kafka输入流 开源Kafka输入流 - CSV DIS输入流 OBS输入流 开源Kafka输入流 DIS输出流 OBS输出流 DWS输出流(通过OBS方式转储) 开源Kafka输出流 文件系统输出流 EMAIL DIS输入流 - JSON DIS输入流 OBS输入流 MRS Kafka输入流 开源Kafka输入流 DIS输出流 OBS输出流 MRS Kafka输出流 开源Kafka输出流 ORC - OBS输出流 DWS输出流(通过OBS方式转储) Parquet - OBS输出流 文件系统输出流 XML DIS输入流 - 父主题: Flink作业咨询类
  • 跨源访问MRS HBase,连接超时,日志未打印错误怎么办? 用户在跨源连接中没有添加集群主机信息,导致KRB认证失败,故连接超时,日志也未打印错误。 建议您重新配置主机信息后再重试访问MRS HBase。 在“增强型跨源”页面,单击该连接“操作”列中的“修改主机信息”,在弹出的对话框中,填写主机信息。 格式:“IP 主机名/ 域名 ”,多条信息之间以换行分隔。 MRS主机信息获取,详细请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 父主题: 增强型跨源连接类
  • 执行查询语句报错:The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget. 该提示信息说明您可能因账户欠费获余额不足导致操作受限。 解决方案: 检查账户状态。 请先确认是否欠费,如有欠费请充值。 重新登录账户。 如果充值后仍然提示相同的错误,请退出账号后重新登录。 父主题: SQL作业运维类
共100000条