云服务器内容精选

  • 配置参数 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中进行设置,修改如下参数: 参数 说明 默认值 spark.sql.mergeSmallFiles.enabled 设置为true,Spark写入目标表时会判断是否写入了小文件,如果发现有小文件,则会启动合并小文件的job。 false spark.sql.mergeSmallFiles.threshold.avgSize 如果某个分区的平均文件大小小于该值,则启动小文件合并。 16MB spark.sql.mergeSmallFiles.maxSizePerTask 合并后的每个文件大小目标大小。 256MB spark.sql.mergeSmallFiles.moveParallelism 当不需要合并小文件后时,将临时文件移动到最终目录的并行度。 10000
  • 问题 Spark应用执行过程中,当driver连接RM失败时,会报下面的错误,且较长时间不退出。 16/04/23 15:31:44 INFO RetryInvocationHandler: Exception while invoking getApplicationReport of class ApplicationClientProtocolPBClientImpl over 37 after 1 fail over attempts. Trying to fail over after sleeping for 44160ms. java.net.ConnectException: Call From vm1/192.168.39.30 to vm1:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
  • 回答 在Spark中有个定期线程,通过连接RM监测AM的状态。由于连接RM超时,就会报上面的错误,且一直重试。RM中对重试次数有限制,默认是30次,每次间隔默认为30秒左右,每次重试时都会报上面的错误。超过次数后,driver才会退出。 RM中关于重试相关的配置项如表1所示。 表1 参数说明 参数 描述 默认值 yarn.resourcemanager.connect.max-wait.ms 连接RM的等待时间最大值。 900000 yarn.resourcemanager.connect.retry-interval.ms 重试连接RM的时间频率。 30000 重试次数=yarn.resourcemanager.connect.max-wait.ms/yarn.resourcemanager.connect.retry-interval.ms,即重试次数=连接RM的等待时间最大值/重试连接RM的时间频率。 在Spark客户端机器中,通过修改“conf/yarn-site.xml”文件,添加并配置“yarn.resourcemanager.connect.max-wait.ms”和“yarn.resourcemanager.connect.retry-interval.ms”,这样可以更改重试次数,Spark应用可以提早退出。
  • 配置场景 Spark中见到的UI、EventLog、动态资源调度等功能都是通过事件传递实现的。事件有SparkListenerJobStart、SparkListenerJobEnd等,记录了每个重要的过程。 每个事件在发生后都会保存到一个队列中,Driver在创建SparkContext对象时,会启动一个线程循环的从该队列中依次拿出一个事件,然后发送给各个Listener,每个Listener感知到事件后就会做各自的处理。 因此当队列存放的速度大于获取的速度时,就会导致队列溢出,从而丢失了溢出的事件,影响了UI、EventLog、动态资源调度等功能。所以为了更灵活的使用,在这边添加一个配置项,用户可以根据Driver的内存大小设置合适的值。
  • 示例 在执行spark wordcount计算中。1.6T数据,250个executor。 在默认参数下执行失败,出现Futures timed out和OOM错误。 因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。 当把Driver的内存设置到4g时,应用成功跑完。 使用JD BCS erver执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。
  • 操作步骤 配置Driver内存。 Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。 您可以根据实际任务数量的多少,为Driver设置一个合适的内存。 将“spark-defaults.conf”中的“spark.driver.memory”配置项设置为合适大小。 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。 配置Executor个数。 每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。 将“spark-defaults.conf”中的“spark.executor.instance”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTAN CES ”配置项设置为合适大小。 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。 配置Executor核数。 每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 操作场景 Spark on Yarn模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对Spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
  • 配置描述 提供两种不同的数据汇聚功能配置选项,两者在Spark JDB CS erver服务端的tunning选项中进行设置,设置完后需要重启JDBCServer。 表1 参数说明 参数 说明 默认值 spark.sql.bigdata.thriftServer.useHdfsCollect 是否将结果数据保存到HDFS中而不是内存中。 优点:由于查询结果保存在hdfs端,因此基本不会造成JDBCServer的OOM。 缺点:速度慢。 true:保存至HDFS中 false:不使用该功能 须知: spark.sql.bigdata.thriftServer.useHdfsCollect参数设置为true时,将结果数据保存到HDFS中,但JobHistory原生页面上Job的描述信息无法正常关联到对应的SQL语句,同时spark-beeline命令行中回显的Execution ID为null,为解决JDBCServer OOM问题,同时显示信息正确,建议选择 spark.sql.userlocalFileCollect参数进行配置。 false spark.sql.uselocalFileCollect 是否将结果数据保存在本地磁盘中而不是内存里面。 优点:结果数据小数据量情况下和原生内存的方式相比性能损失可以忽略,大数据情况下(亿级数据)性能远比使用hdfs,以及原生内存方式好。 缺点:需要调优。大数据情况下建议JDBCServer driver端内存10G,executor端每个核心分配3G内存。 true:使用该功能 false: 不使用该功能 false spark.sql.collect.Hive 该参数在spark.sql.uselocalFileCollect开启的情况下生效。直接序列化的方式,还是间接序列化的方式保存结果数据到磁盘。 优点:针对分区数特别多的表查询结果汇聚性能优于直接使用结果数据保证在磁盘的方式。 缺点:和spark.sql.uselocalFileCollect开启时候的缺点一样。 true:使用该功能 false:不使用该功能 false spark.sql.collect.serialize 该参数在spark.sql.uselocalFileCollect, spark.sql.collect.Hive同时开启的情况下生效。 作用是进一步提升性能 java:采用java序列化方式收集数据。 kryo:采用kryo序列化方式收集数据,性能要比采用java好。 java 参数spark.sql.bigdata.thriftServer.useHdfsCollect和spark.sql.uselocalFileCollect不能同时设置为true。
  • 回答 停止应用,在Spark的配置文件“spark-defaults.conf”中将配置项“spark.event.listener.logEnable”配置为“true”。并把配置项“spark.eventQueue.size”配置为1000W。如果需要控制打印频率(默认为1000毫秒打印1条日志),请根据需要修改配置项“spark.event.listener.logRate”,该配置项的单位为毫秒。 启动应用,可以发现如下的日志信息(消费者速率、生产者速率、当前队列中的消息数量和队列中消息数量的最大值)。 INFO LiveListenerBus: [SparkListenerBus]:16044 events are consumed in 5000 ms. INFO LiveListenerBus: [SparkListenerBus]:51381 events are produced in 5000 ms, eventQueue still has 86417 events, MaxSize: 171764. 用户可以根据日志信息【队列中消息数量的最大值MaxSize】,在配置文件“spark-defaults.conf”中将配置项“spark.eventQueue.size”配置成合适的队列大小。比如【队列中消息数量的最大值】为250000,那么配置合适的队列大小为300000。
  • 问题 当Driver日志中出现如下的日志时,表示事件队列溢出了。当事件队列溢出时如何配置事件队列的大小? 普通应用 Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. Spark Streaming应用 Dropping StreamingListenerEvent because no remaining room in event queue. This likely means one of the StreamingListeners is too slow and cannot keep up with the rate at which events are being started by the scheduler.
  • 问题 在380节点的大集群上,运行29T数据量的HiBench测试套中ScalaSort测试用例,使用以下关键配置(--executor-cores 4)出现如下异常: org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.168.114.12:23242 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:217) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:102) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.114.12:23242 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:91) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: java.net.ConnectException: Connection timed out: /192.168.114.12:23242 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more
  • 回答 在运行应用程序时,使用Executor参数“--executor-cores 4”,单进程中并行度高导致IO非常繁忙,以至于任务运行缓慢。 16/02/26 10:04:53 INFO TaskSetManager: Finished task 2139.0 in stage 1.0 (TID 151149) in 376455 ms on 10-196-115-2 (694/153378) 单个任务运行时间超过6分钟,从而导致连接超时问题,最终使得任务失败。 将参数中的核数设置为1,“--executor-cores 1”,任务正常完成,单个任务处理时间在合理范围之内(15秒左右)。 16/02/29 02:24:46 INFO TaskSetManager: Finished task 59564.0 in stage 1.0 (TID 208574) in 15088 ms on 10-196-115-6 (59515/153378) 因此,处理这类网络超时任务,可以减少单个Executor的核数来规避该类问题。
  • 操作场景 Spark是内存计算框架,计算过程中内存不够对Spark的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存中RDD的大小来判断内存是否变成性能瓶颈,并根据情况优化。 监控节点进程的GC情况(在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" ),如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
  • 操作步骤 优化GC,调整老年代和新生代的大小和比例。在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:-XX:NewRatio。如," -XX:NewRatio=2",则新生代占整个堆空间的1/3,老年代占2/3。 开发Spark应用程序时,优化RDD的数据结构。 使用原始类型数组替代集合类,如可使用fastutil库。 避免嵌套结构。 Key尽量不要使用String。 开发Spark应用程序时,建议序列化RDD。 RDD做cache时默认是不序列化数据的,可以通过设置存储级别来序列化RDD减小内存。例如: testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
  • 操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。