云服务器内容精选

  • 操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
  • 操作场景 Spark是内存计算框架,计算过程中内存不够对Spark的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存中RDD的大小来判断内存是否变成性能瓶颈,并根据情况优化。 监控节点进程的GC情况(在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" ),如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
  • 操作步骤 优化GC,调整老年代和新生代的大小和比例。在客户端的“conf/spark-default.conf”配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:-XX:NewRatio。如:" -XX:NewRatio=2",则新生代占整个堆空间的1/3,老年代占2/3。 开发Spark应用程序时,优化RDD的数据结构。 使用原始类型数组替代集合类,如可使用fastutil库。 避免嵌套结构。 Key尽量不要使用String。 开发Spark应用程序时,建议序列化RDD。 RDD做cache时默认是不序列化数据的,可以通过设置存储级别来序列化RDD减小内存。例如: testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
  • 操作场景 对于Spark应用来说,资源是影响Spark应用执行效率的一个重要因素。当一个长期运行的服务(比如JD BCS erver),如果分配给它多个Executor,可是却没有任何任务分配给它,而此时有其他的应用却资源紧张,这就造成了很大的资源浪费和资源不合理的调度。 动态资源调度就是为了解决这种场景,根据当前应用任务的负载情况,实时的增减Executor个数,从而实现动态分配资源,使整个Spark系统更加健康。
  • 使用coalesce调整分片的数量 coalesce可以调整分片的数量。coalesce函数有两个参数: coalesce(numPartitions: Int, shuffle: Boolean = false) 当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。 遇到下列场景,可选择使用coalesce算子: 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。 当输入切片个数太大,导致程序无法正常运行时使用。 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。
  • 示例 在执行spark wordcount计算中。1.6T数据,250个executor。 在默认参数下执行失败,出现Futures timed out和OOM错误。 因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。 当把Driver的内存设置到4g时,应用成功跑完。 使用JDB CS erver执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。
  • 操作步骤 配置Driver内存。 Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。 您可以根据实际任务数量的多少,为Driver设置一个合适的内存。 将“spark-defaults.conf”中的“spark.driver.memory”配置项设置为合适大小。 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。 配置Executor个数。 每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。 将“spark-defaults.conf”中的“spark.executor.instance”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTAN CES ”配置项设置为合适大小。 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。 配置Executor核数。 每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 操作场景 Spark on Yarn模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对Spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
  • 操作步骤 并行度可以通过如下三种方式来设置,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。 在会产生shuffle的操作函数内设置并行度参数,优先级最高。 testRDD.groupByKey(24) 在代码中配置“spark.default.parallelism”设置并行度,优先级次之。 val conf = new SparkConf() conf.set("spark.default.parallelism", 24) 在“$SPARK_HOME/conf/spark-defaults.conf”文件中配置“spark.default.parallelism”的值,优先级最低。 spark.default.parallelism 24
  • 配置描述 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 表1 参数说明 参数 说明 默认值 spark.executor.memoryOverhead 用于指定每个executor的堆外内存大小(MB),增大该参数值,可以防止物理内存超限。该值是通过max(384,executor-memory*0.1)计算所得,最小值为384。 1024
  • 操作步骤 优化GC,调整老年代和新生代的大小和比例。在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:-XX:NewRatio。如," -XX:NewRatio=2",则新生代占整个堆空间的1/3,老年代占2/3。 开发Spark应用程序时,优化RDD的数据结构。 使用原始类型数组替代集合类,如可使用fastutil库。 避免嵌套结构。 Key尽量不要使用String。 开发Spark应用程序时,建议序列化RDD。 RDD做cache时默认是不序列化数据的,可以通过设置存储级别来序列化RDD减小内存。例如: testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
  • 操作场景 Spark是内存计算框架,计算过程中内存不够对Spark的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存中RDD的大小来判断内存是否变成性能瓶颈,并根据情况优化。 监控节点进程的GC情况(在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" ),如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
  • 操作步骤 并行度可以通过如下三种方式来设置,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。 在会产生shuffle的操作函数内设置并行度参数,优先级最高。 testRDD.groupByKey(24) 在代码中配置“spark.default.parallelism”设置并行度,优先级次之。 val conf = new SparkConf() conf.set("spark.default.parallelism", 24) 在“$SPARK_HOME/conf/spark-defaults.conf”文件中配置“spark.default.parallelism”的值,优先级最低。 spark.default.parallelism 24
  • 操作场景 Spark支持两种方式的序列化 : Java原生序列化JavaSerializer Kryo序列化KryoSerializer 序列化对于Spark应用的性能来说,具有很大的影响。在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略。 KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持,兼容性不好,所以需要手动注册类。 序列化功能用在两个地方:序列化任务和序列化数据。Spark任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和KryoSerializer。
  • 操作步骤 Spark程序运行时,在shuffle和RDD Cache等过程中,会有大量的数据需要序列化,默认使用JavaSerializer,通过配置让KryoSerializer作为数据序列化器来提升序列化性能。 在开发应用程序时,添加如下代码来使用KryoSerializer作为数据序列化器。 实现类注册器并手动注册类。 package com.etl.common; import com.esotericsoftware.kryo.Kryo; import org.apache.spark.serializer.KryoRegistrator; public class DemoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { //以下为示例类,请注册自定义的类 kryo.register(AggrateKey.class); kryo.register(AggrateValue.class); } } 您可以在Spark客户端对spark.kryo.registrationRequired参数进行配置,设置是否需要Kryo注册序列化。 当参数设置为true时,如果工程中存在未被序列化的类,则会发生异常。如果设置为false(默认值),Kryo会自动将未注册的类名写到对应的对象中。此操作会对系统性能造成影响。设置为true时,用户需手动注册类,针对未序列化的类,系统不会自动写入类名,而是发生异常,相对比false,其性能较好。 配置KryoSerializer作为数据序列化器和类注册器。 val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "com.etl.common.DemoRegistrator")