华为云用户手册

  • 操作场景 Spark on Yarn模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对Spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
  • 操作场景 对于Spark应用来说,资源是影响Spark应用执行效率的一个重要因素。当一个长期运行的服务(比如JD BCS erver),如果分配给它多个Executor,可是却没有任何任务分配给它,而此时有其他的应用却资源紧张,这就造成了很大的资源浪费和资源不合理的调度。 动态资源调度就是为了解决这种场景,根据当前应用任务的负载情况,实时的增减Executor个数,从而实现动态分配资源,使整个Spark系统更加健康。
  • 操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
  • 操作场景 Broadcast(广播)可以把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。 大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
  • 操作步骤 并行度可以通过如下三种方式来设置,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。 在会产生shuffle的操作函数内设置并行度参数,优先级最高。 testRDD.groupByKey(24) 在代码中配置“spark.default.parallelism”设置并行度,优先级次之。 val conf = new SparkConf() conf.set("spark.default.parallelism", 24) 在“$SPARK_HOME/conf/spark-defaults.conf”文件中配置“spark.default.parallelism”的值,优先级最低。 spark.default.parallelism 24
  • 操作场景 Spark是内存计算框架,计算过程中内存不够对Spark的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存中RDD的大小来判断内存是否变成性能瓶颈,并根据情况优化。 监控节点进程的GC情况(在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" ),如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
  • 操作步骤 优化GC,调整老年代和新生代的大小和比例。在客户端的“conf/spark-default.conf”配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:-XX:NewRatio。如:" -XX:NewRatio=2",则新生代占整个堆空间的1/3,老年代占2/3。 开发Spark应用程序时,优化RDD的数据结构。 使用原始类型数组替代集合类,如可使用fastutil库。 避免嵌套结构。 Key尽量不要使用String。 开发Spark应用程序时,建议序列化RDD。 RDD做cache时默认是不序列化数据的,可以通过设置存储级别来序列化RDD减小内存。例如: testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
  • 操作步骤 Spark程序运行时,在shuffle和RDD Cache等过程中,会有大量的数据需要序列化,默认使用JavaSerializer,通过配置让KryoSerializer作为数据序列化器来提升序列化性能。 在开发应用程序时,添加如下代码来使用KryoSerializer作为数据序列化器。 实现类注册器并手动注册类。 package com.etl.common; import com.esotericsoftware.kryo.Kryo; import org.apache.spark.serializer.KryoRegistrator; public class DemoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { //以下为示例类,请注册自定义的类 kryo.register(AggrateKey.class); kryo.register(AggrateValue.class); } } 您可以在Spark客户端对spark.kryo.registrationRequired参数进行配置,设置是否需要Kryo注册序列化。 当参数设置为true时,如果工程中存在未被序列化的类,则会发生异常。如果设置为false(默认值),Kryo会自动将未注册的类名写到对应的对象中。此操作会对系统性能造成影响。设置为true时,用户需手动注册类,针对未序列化的类,系统不会自动写入类名,而是发生异常,相对比false,其性能较好。 配置KryoSerializer作为数据序列化器和类注册器。 val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "com.etl.common.DemoRegistrator")
  • 操作场景 Spark支持两种方式的序列化 : Java原生序列化JavaSerializer Kryo序列化KryoSerializer 序列化对于Spark应用的性能来说,具有很大的影响。在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略。 KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持,兼容性不好,所以需要手动注册类。 序列化功能用在两个地方:序列化任务和序列化数据。Spark任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和KryoSerializer。
  • 工具介绍 在Hadoop大规模生产集群中,由于HDFS的元数据都保存在NameNode的内存中,集群规模受制于NameNode单点的内存限制。如果HDFS中有大量的小文件,会消耗NameNode大量内存,还会大幅降低读写性能,延长作业运行时间。因此,小文件问题是制约Hadoop集群规模扩展的关键问题。 本工具主要有如下两个功能: 扫描表中有多少低于用户设定阈值的小文件,返回该表目录中所有数据文件的平均大小。 对表文件提供合并功能,用户可设置合并后的平均文件大小。
  • 日志描述 日志存储路径: Executor运行日志:“${BIGDATA_DATA_HOME}/hadoop/data${i}/nm/containerlogs/application_${appid}/container_{$contid}” 运行中的任务日志存储在以上路径中,运行结束后会基于Yarn的配置确定是否汇聚到HDFS目录中,详情请参见Yarn常用配置参数。 其他日志:“/var/log/Bigdata/spark2x” 日志归档规则: 使用yarn-client或yarn-cluster模式提交任务时,Executor日志默认50MB滚动存储一次,最多保留10个文件,不压缩。 JobHistory2x日志默认100MB滚动存储一次,最多保留100个文件,压缩存储。 JDB CS erver2x日志默认100MB滚动存储一次,最多保留100个文件,压缩存储。 IndexServer2x日志默认100MB滚动存储一次,最多保留100个文件,压缩存储。 JDBCServer2x审计日志默认20MB滚动存储一次,最多保留20个文件,压缩存储。 日志大小和压缩文件保留个数可以在 FusionInsight Manager界面中配置。 表1 Spark2x日志列表 日志类型 日志文件名 描述 SparkResource2x日志 spark.log Spark2x服务初始化日志。 prestart.log prestart脚本日志。 cleanup.log 安装卸载实例时的清理日志。 spark-availability-check.log Spark2x服务健康检查日志。 spark-service-check.log Spark2x服务检查日志 JDBCServer2x日志 JDBCServer-start.log JDBCServer2x启动日志。 JDBCServer-stop.log JDBCServer2x停止日志。 JDBCServer.log JDBCServer2x运行时,Driver端日志。 jdbc-state-check.log JDBCServer2x健康检查日志。 jdbcserver-omm-pid***-gc.log.*.current JDBCServer2x进程gc日志。 spark-omm-org.apache.spark.sql.hive.thriftserver.HiveThriftProxyServer2-***.out* JDBCServer2x进程启动信息日志。如果进程停止,会打印jstack信息。 JobHistory2x日志 jobHistory-start.log JobHistory2x启动日志。 jobHistory-stop.log JobHistory2x停止日志。 JobHistory.log JobHistory2x运行过程日志。 jobhistory-omm-pid***-gc.log.*.current JobHistory2x进程gc日志。 spark-omm-org.apache.spark.deploy.history.HistoryServer-***.out* JobHistory2x进程启动信息日志。如果进程停止,会打印jstack信息。 IndexServer2x日志 IndexServer-start.log IndexServer2x启动日志。 IndexServer-stop.log IndexServer2x停止日志。 IndexServer.log IndexServer2x运行时,Driver端日志。 indexserver-state-check.log IndexServer2x健康检查日志。 indexserver-omm-pid***-gc.log.*.current IndexServer2x进程gc日志。 spark-omm-org.apache.spark.sql.hive.thriftserver.IndexServerProxy-***.out* IndexServer2x进程启动信息日志。如果进程停止,会打印jstack信息。 审计日志 jdbcserver-audit.log ranger-audit.log JDBCServer2x审计日志。
  • 配置场景 Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下: 自动设置shuffle partition数。 在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能更优。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。 动态调整执行计划。 在启用Adaptive Execution特性前,Spark SQL根据RBO和CBO的优化结果创建执行计划,此种方法忽略了数据在运行过程中的结果集变化。比如基于某个大表创建的视图,与其他大表join时,即便视图的结果集很小,也无法将执行计划调整为BroadcastJoin。启用Adaptive Execution特性后,Spark SQL能够在运行过程中根据前面stage的运行结果动态调整后续的执行计划,从而获得更好的执行性能。 自动处理数据倾斜。 在执行SQL语句时,如果存在数据倾斜,可能导致单个executor内存溢出、任务执行缓慢等问题。启动Adaptive Execution特性后,Spark SQL能自动处理数据倾斜场景,对倾斜的分区,启动多个task进行处理,每个task读取部分shuffle输出文件,再对这部分任务的Join结果进行Union操作,以达到消除数据倾斜的效果。
  • 配置场景 Spark优化sql的执行,一般的优化规则都是启发式的优化规则,启发式的优化规则,仅仅根据逻辑计划本身的特点给出优化,没有考虑数据本身的特点,也就是未考虑算子本身的执行代价。Spark在2.2中引入了基于代价的优化规则(CBO)。CBO会收集表和列的统计信息,结合算子的输入数据集来估计每个算子的输出条数以及字节大小,这些就是执行一个算子的代价。 CBO会调整执行计划,来最小化端到端的查询时间,中心思路2点: 尽早过滤不相关的数据。 最小化每个算子的代价。 CBO优化过程分为2步: 收集统计信息。 根据输入的数据集估算特定算子的输出数据集。 表级别统计信息包括:记录条数;表数据文件的总大小。 列级别统计信息包括:唯一值个数;最大值;最小值;空值个数;平均长度;最大长度;直方图。 有了统计信息后,就可以估计算子的执行代价了。常见的算子包括过滤条件Filter算子和Join算子。 直方图为列统计值的一种,可以直观的描述列数据的分布情况,将列的数据从最小值到最大值划分为事先指定数量的槽位(bin),计算各个槽位的上下界的值,使得全部数据都确定槽位后,所有槽位中的数据数量相同(等高直方图)。有了数据的详细分布后,各个算子的代价估计能更加准确,优化效果更好。 该特性可以通过下面的配置项开启: spark.sql.statistics.histogram.enabled:指定是否开启直方图功能,默认为false。
  • 配置场景 ORC文件格式是一种Hadoop生态圈中的列式存储格式,它最初产生自Apache Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。和Parquet文件格式类似,它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内按列进行存储,并且文件中的数据尽可能的压缩来降低存储空间的消耗。矢量化读取ORC格式的数据能够大幅提升ORC数据读取性能。在Spark2.3版本中,SparkSQL支持矢量化读取ORC数据(这个特性在Hive的历史版本中已经得到支持)。矢量化读取ORC格式的数据能够获得比传统读取方式数倍的性能提升。 该特性可以通过下面的配置项开启: “spark.sql.orc.enableVectorizedReader”:指定是否支持矢量化方式读取ORC格式的数据,默认为true。 “spark.sql.codegen.wholeStage”:指定是否需要将多个操作的所有stage编译为一个java方法,默认为true。 “spark.sql.codegen.maxFields”:指定codegen的所有stage所支持的最大字段数(包括嵌套字段),默认为100。 “spark.sql.orc.impl”:指定使用Hive还是Spark SQL native作为SQL执行引擎来读取ORC数据,默认为hive。
  • 配置场景 当Spark2x Web UI中有一些不允许其他用户看到的数据时,用户可能想对UI进行安全防护。用户一旦登录,Spark2x 可以比较与这个用户相对应的视图ACLs来确认是否授权用户访问 UI。 Spark2x存在两种类型的Web UI,一种为运行中任务的Web UI,可以通过Yarn原生页面的应用链接或者REST接口访问。一种为已结束任务的Web UI,可以通过Spark2x JobHistory服务或者REST接口访问。 本章节仅支持安全模式(开启了Kerberos认证)集群。 运行中任务Web UI ACL配置。 运行中的任务,可通过服务端对如下参数进行配置。 “spark.admin.acls”:指定Web UI的管理员列表。 “spark.admin.acls.groups”:指定管理员组列表。 “spark.ui.view.acls”:指定yarn界面的访问者列表。 “spark.modify.acls.groups”:指定yarn界面的访问者组列表。 “spark.modify.acls”:指定Web UI的修改者列表。 “spark.ui.view.acls.groups”:指定Web UI的修改者组列表。 运行结束后Web UI ACL配置。 运行结束的任务通过客户端的参数“spark.history.ui.acls.enable”控制是否开启ACL访问权限。 如果开启了ACL控制,由客户端的“spark.admin.acls”和“spark.admin.acls.groups”配置指定Web UI的管理员列表和管理员组列表,由客户端的“spark.ui.view.acls”和“spark.modify.acls.groups”配置指定查看Web UI任务明细的访问者列表和组列表,由客户端的“spark.modify.acls”和“spark.ui.view.acls.groups”配置指定修改Web UI任务明细的访问者列表和组列表。
  • 配置描述 可以通过以下两种方式配置是否过滤掉分区表分区路径不存在的分区。 在Spark Driver端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 说明 默认值 spark.sql.hive.verifyPartitionPath 配置读取Hive分区表时,是否过滤掉分区表分区路径不存在的分区。 “true”:过滤掉分区路径不存在的分区; “false”:不进行过滤。 false 在spark-submit命令提交应用时,通过“--conf”参数配置是否过滤掉分区表分区路径不存在的分区。 示例: spark-submit --class org.apache.spark.examples.SparkPi --conf spark.sql.hive.verifyPartitionPath=true $SPARK_HOME/lib/spark-examples_*.jar
  • 配置描述 提供两种不同的数据汇聚功能配置选项,两者在Spark JDBCServer服务端的tunning选项中进行设置,设置完后需要重启JDBCServer。 表1 参数说明 参数 说明 默认值 spark.sql.bigdata.thriftServer.useHdfsCollect 是否将结果数据保存到HDFS中而不是内存中。 优点:由于查询结果保存在hdfs端,因此基本不会造成JDBCServer的OOM。 缺点:速度慢。 true:保存至HDFS中。 false:不使用该功能。 须知: spark.sql.bigdata.thriftServer.useHdfsCollect参数设置为true时,将结果数据保存到HDFS中,但JobHistory原生页面上Job的描述信息无法正常关联到对应的SQL语句,同时spark-beeline命令行中回显的Execution ID为null,为解决JDBCServer OOM问题,同时显示信息正确,建议选择spark.sql.userlocalFileCollect参数进行配置。 false spark.sql.uselocalFileCollect 是否将结果数据保存在本地磁盘中而不是内存里面。 优点:结果数据小数据量情况下和原生内存的方式相比性能损失可以忽略,大数据情况下(亿级数据)性能远比使用HDFS,以及原生内存方式好。 缺点:需要调优。大数据情况下建议JDBCServer driver端内存10G,executor端每个核心分配3G内存。 true:使用该功能。 false: 不使用该功能。 false spark.sql.collect.Hive 该参数在spark.sql.uselocalFileCollect开启的情况下生效。直接序列化的方式,还是间接序列化的方式保存结果数据到磁盘。 优点:针对分区数特别多的表查询结果汇聚性能优于直接使用结果数据保证在磁盘的方式。 缺点:和spark.sql.uselocalFileCollect开启时候的缺点一样。 true:使用该功能。 false:不使用该功能。 false spark.sql.collect.serialize 该参数在spark.sql.uselocalFileCollect, spark.sql.collect.Hive同时开启的情况下生效。 作用是进一步提升性能 java:采用java序列化方式收集数据。 kryo:采用kryo序列化方式收集数据,性能要比采用java好。 java 参数spark.sql.bigdata.thriftServer.useHdfsCollect和spark.sql.uselocalFileCollect不能同时设置为true。
  • 配置场景 Spark Streaming对接Kafka时,当Spark Streaming应用重启后,应用根据上一次读取的topic offset作为起始位置和当前topic最新的offset作为结束位置从Kafka上读取数据的。 Kafka服务的topic的leader异常后,如果Kafka的leader和follower的offset相差太大,用户重启Kafka服务,Kafka的follower和leader相互切换,则Kafka服务重启后,topic的offset变小。 如果Spark Streaming应用一直在运行,由于Kafka上topic的offset变小,会导致读取Kafka数据的起始位置比结束位置大,这样将无法从Kafka读取数据,应用报错。 如果在重启Kafka服务前,先停止Spark Streaming应用,等Kafka重启后,再重启Spark Streaming应用使应用从checkpoint恢复。此时,Spark Streaming应用会记录终止前读取到的offset位置,以此为基准读取后面的数据,而Kafka offset变小(例如从10万变成1万),Spark Streaming会等待Kafka leader的offset增长至10万之后才会去消费,导致新发送的offset在1万至10万之间的数据丢失。 针对上述背景,提供配置Streaming对接Kafka更高级别的可靠性。对接Kafka可靠性功能开启后,上述场景处理方式如下。 如果Spark Streaming应用在运行应用时Kafka上topic的offset变小,则会将Kafka上topic最新的offset作为读取Kafka数据的起始位置,继续读取后续的数据。 对于已经生成但未调度处理的任务,如果读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务会运行失败。 如果任务失败过多,则会将executor加入黑名单,从而导致后续的任务无法部署运行。此时用户可以通过配置“spark.blacklist.enabled”参数关闭黑名单功能,黑名单功能默认为开启。 如果Kafka上topic的offset变小后,Spark Streaming应用进行重启恢复终止前未处理完的任务如果读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务直接丢弃,不进行处理。 如果Streaming应用中使用了state函数,则不允许开启对接Kafka可靠性功能。
  • 配置场景 当Spark Streaming应用与Kafka对接,Spark Streaming应用异常终止并从checkpoint恢复重启后,对于进入Kafka数据的任务,系统默认优先处理应用终止前(A段时间)未完成的任务和应用终止到重启完成这段时间内(B段时间)进入Kafka数据生成的任务,最后再处理应用重启完成后(C段时间)进入Kafka数据生成的任务。并且对于B段时间进入Kafka的数据,Spark将按照终止时间(batch时间)生成相应个数的任务,其中第一个任务读取全部数据,其余任务可能不读取数据,造成任务处理压力不均匀。 如果A段时间的任务和B段时间任务处理得较慢,则会影响C段时间任务的处理。针对上述场景,Spark提供Kafka后进先出功能。 图1 Spark Streaming应用重启时间轴 开启此功能后,Spark将优先调度C段时间内的任务,如果存在多个C段任务,则按照任务产生的先后顺序调度执行,再执行A段时间和B段时间的任务。另外,对于B段时间进入Kafka的数据,Spark除了按照终止时间生成相应任务,还将这个期间进入Kafka的所有数据均匀分配到各个任务,避免任务处理压力不均匀。 约束条件: 目前该功能只适用于Spark Streaming中的Direct方式,且执行结果与上一个batch时间处理结果没有依赖关系(即无state操作,如updatestatebykey)。对多条数据输入流,需要相对独立无依赖的状态,否则可能导致数据切分后结果发生变化。 Kafka后进先出功能的开启要求应用只能对接Kafka输入源。 如果提交应用的同时开启Kafka后进先出和流控功能,对于B段时间进入Kafka的数据,将不启动流控功能,以确保读取这些数据的任务调度优先级最低。应用重新启动后C段时间的任务启用流控功能。
  • 配置描述 在Spark Driver端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 说明 默认值 spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false spark.streaming.kafka010.inputstream.class 获取解耦在FusionInsight侧的类。 org.apache.spark.streaming.kafka010.xxDirectKafkaInputDStream
  • 配置描述 在进程对应的JVM参数配置项中增加以下参数。 表1 参数描述 参数 描述 默认值 -Dlog4j.configuration.watch 进程JVM参数,设置成“true”用于打开动态设置日志级别功能。 未配置,即为false。 Driver、Executor、AM进程的JVM参数如表2所示。在Spark客户端的配置文件“spark-defaults.conf”中进行配置。Driver、Executor、AM进程的日志级别在对应的JVM参数中的“-Dlog4j.configuration”参数指定的log4j配置文件中设置。 表2 进程的JVM参数1 参数 说明 默认日志级别 spark.driver.extraJavaOptions Driver的JVM参数。 INFO spark.executor.extraJavaOptions Executor的JVM参数。 INFO spark.yarn.am.extraJavaOptions AM的JVM参数。 INFO JobHistory Server和JDBCServer的JVM参数如表3所示。在服务端配置文件“ENV_VARS”中进行配置。JobHistory Server和JDBCServer的日志级别在服务端配置文件“log4j.properties”中设置。 表3 进程的JVM参数2 参数 说明 默认日志级别 GC_OPTS JobHistory Server的JVM参数。 INFO SPARK_SUBMIT_OPTS JDBCServer的JVM参数。 INFO 示例: 为了动态修改Executor日志级别为DEBUG,在进程启动之前,修改“spark-defaults.conf”文件中的Executor的JVM参数“spark.executor.extraJavaOptions”,增加如下配置: -Dlog4j.configuration.watch=true 提交用户应用后,修改“spark.executor.extraJavaOptions”中“-Dlog4j.configuration”参数指定的log4j日志配置文件(例如:“-Dlog4j.configuration=file:${BIGDATA_HOME}/FusionInsight_Spark2x_xxx/install/FusionInsight-Spark2x-*/spark/conf/log4j-executor.properties”)中的日志级别为DEBUG,如下所示: log4j.rootCategory=DEBUG, sparklog DEBUG级别生效会有一定的时延。
  • 配置场景 在某些场景下,当任务已经启动后,用户想要修改日志级别以定位问题或者查看想要的信息。 用户可以在进程启动前,在进程的JVM参数中增加参数“-Dlog4j.configuration.watch=true”来打开动态设置日志级别的功能。进程启动后,就可以通过修改进程对应的log4j配置文件,来调整日志打印级别。 目前支持动态设置日志级别功能的有:Driver日志、Executor日志、AM日志、JobHistory日志、JDBCServer日志。 允许设置的日志级别是:FATAL,ERROR,WARN,INFO,DEBUG,TRACE和ALL。
  • 配置场景 当前版本对于parquet表的压缩格式分以下两种情况进行配置: 对于分区表,需要通过parquet本身的配置项“parquet.compression”设置parquet表的数据压缩格式。如在建表语句中设置tblproperties:"parquet.compression"="snappy"。 对于非分区表,需要通过“spark.sql.parquet.compression.codec”配置项来设置parquet类型的数据压缩格式。直接设置“parquet.compression”配置项是无效的,因为它会读取“spark.sql.parquet.compression.codec”配置项的值。当“spark.sql.parquet.compression.codec”未做设置时默认值为“snappy”,“parquet.compression”会读取该默认值。 因此,“spark.sql.parquet.compression.codec”配置项只适用于设置非分区表的parquet压缩格式。
  • 配置场景 SparkSQL在进行shuffle操作时默认的分块数为200。在数据量特别大的场景下,使用默认的分块数就会造成单个数据块过大。如果一个任务产生的单个shuffle数据块大于2G,该数据块在被fetch的时候还会报类似错误: Adjusted frame length exceeds 2147483647: 2717729270 - discarded 例如,SparkSQL运行TPCDS 500G的测试时,使用默认配置出现错误。所以当数据量较大时需要适当的调整该参数。
  • 配置场景 当前,在YARN-Client和YARN-Cluster模式下,两种模式的客户端存在冲突的配置,即当客户端为一种模式的配置时,会导致在另一种模式下提交任务失败。 为避免出现如上情况,添加表1中的配置项,避免两种模式下来回切换参数,提升软件易用性。 YARN-Cluster模式下,优先使用新增配置项的值,即服务端路径和参数。 YARN-Client模式下,直接使用原有的三个配置项的值。 原有的三个配置项为:“spark.driver.extraClassPath”、“spark.driver.extraJavaOptions”、“spark.driver.extraLibraryPath”。 不添加表1中配置项时,使用方式与原有方式一致,程序可正常执行,只是在不同模式下需切换配置。
  • 配置描述 为了使WebUI页面显示日志,需要将聚合日志进行解析和展现。Spark是通过Hadoop的JobHistoryServer来解析聚合日志的,所以您可以通过“spark.jobhistory.address”参数,指定JobHistoryServer页面地址,即可完成解析和展现。 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 此功能依赖Hadoop中的JobHistoryServer服务,所以使用聚合日志之前需要保证JobHistoryServer服务已经运行正常。 如果参数值为空,“AggregatedLogs”页签仍然存在,但是无法通过logs链接查看日志。 只有当App已经running,HDFS上已经有该App的事件日志文件时才能查看到聚合的container日志。 正在运行的任务的日志,用户可以通过“Executors”页面的日志链接进行查看,任务结束后日志会汇聚到HDFS上,“Executors”页面的日志链接就会失效,此时用户可以通过“AggregatedLogs”页面的logs链接查看聚合日志。 表1 参数说明 参数 描述 默认值 spark.jobhistory.address JobHistoryServer页面的地址,格式:http(s)://ip:port/jobhistory。例如,将参数值设置为“https://10.92.115.1:26014/jobhistory”。 默认值为空,表示不能从WebUI查看container聚合日志。 修改参数后,需重启服务使得配置生效。 -
  • 配置场景 当Yarn配置“yarn.log-aggregation-enable”为“true”时,就开启了container日志聚合功能。日志聚合功能是指:当应用在Yarn上执行完成后,NodeManager将本节点中所有container的日志聚合到HDFS中,并删除本地日志。详情请参见配置Container日志聚合功能。 然而,开启container日志聚合功能之后,其日志聚合至HDFS目录中,只能通过获取HDFS文件来查看日志。开源Spark和Yarn服务不支持通过WebUI查看聚合后的日志。 因此,Spark在此基础上进行了功能增强。如图1所示,在HistoryServer页面添加“AggregatedLogs”页签,可以通过“logs”链接查看聚合的日志。 图1 聚合日志显示页面
  • 配置场景 当前Spark SQL执行一个查询时需要使用大量的内存,尤其是在做聚合(Aggregate)和关联(Join)操作时,此时如果内存有限的情况下就很容易出现OutOfMemoryError。有限内存下的稳定性就是确保在有限内存下依然能够正确执行相关的查询,而不出现OutOfMemoryError。 有限内存并不意味着内存无限小,它只是在内存不足于放下大于内存可用总量几倍的数据时,通过利用磁盘来做辅助从而确保查询依然稳定执行,但依然有一些数据是必须留在内存的,如在做涉及到Join的查询时,对于当前用于Join的相同key的数据还是需要放在内存中,如果该数据量较大而内存较小依然会出现OutOfMemoryError。 有限内存下的稳定性涉及到3个子功能: ExternalSort 外部排序功能,当执行排序时如果内存不足会将一部分数据溢出到磁盘中。 TungstenAggregate 新Hash聚合功能,默认对数据调用外部排序进行排序,然后再进行聚合,因此内存不足时在排序阶段会将数据溢出到磁盘,在聚合阶段因数据有序,在内存中只保留当前key的聚合结果,使用的内存较小。 SortMergeJoin、SortMergeOuterJoin 基于有序数据的等值连接。该功能默认对数据调用外部排序进行排序,然后再进行等值连接,因此内存不足时在排序阶段会将数据溢出到磁盘,在连接阶段因数据有序,在内存中只保留当前相同key的数据,使用的内存较小。
  • 配置描述 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 表1 参数说明 参数 描述 默认值 spark.sql.tungsten.enabled 类型为Boolean。 当设置的值等于true时,表示开启tungsten功能,即逻辑计划等同于开启codegeneration,同时物理计划使用对应的tungsten执行计划。 当设置的值等于false时,表示关闭tungsten功能。 true spark.sql.codegen.wholeStage 类型为Boolean。 当设置的值等于true时,表示开启codegeneration功能,即运行时对于某些特定的查询将动态生成各逻辑计划代码。 当设置的值等于false时,表示关闭codegeneration功能,运行时使用当前已有静态代码。 true 开启ExternalSort除配置spark.sql.planner.externalSort=true外,还需配置spark.sql.unsafe.enabled=false或者spark.sql.codegen.wholeStage =false。 如果您需要开启TungstenAggregate,有如下几种方式: 将spark.sql.codegen.wholeStage和spark.sql.unsafe.enabled的值都设置为true(通过配置文件或命令行方式设置)。 如果spark.sql.codegen.wholeStage和spark.sql.unsafe.enabled都不为true或者其中一个不为true,只要spark.sql.tungsten.enabled的值设置为true时,TungstenAggregate会开启。
  • 配置描述 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 表1 参数说明 参数 说明 默认值 spark.executor.memoryOverhead 用于指定每个executor的堆外内存大小(MB),增大该参数值,可以防止物理内存超限。该值是通过max(384,executor-memory*0.1)计算所得,最小值为384。 1024
共100000条