华为云用户手册

  • 回答 动态分区表插入数据的最后一步是读取shuffle文件的数据,再写入到表对应的分区文件中。 当大面积shuffle文件损坏后,会引起大批量task失败,然后进行job重试。重试前Spark会将写表分区文件的句柄关闭,大批量task关闭句柄时HDFS无法及时处理。在task进行下一次重试时,句柄在NameNode端未被及时释放,即会发生"Failed to CREATE_FILE"异常。 这种现象仅会在大面积shuffle文件损坏时发生,出现异常后task会重试,重试耗时在毫秒级,影响较小,可以忽略不计。
  • 回答 造成该现象的原因是NodeManager重启。使用ExternalShuffle的时候,Spark将借用NodeManager传输Shuffle数据,因此NodeManager的内存将成为瓶颈。 在当前版本的 FusionInsight 中,NodeManager的默认内存只有1G,在数据量比较大(1T以上)的Spark任务下,内存严重不足,消息响应缓慢,导致FusionInsight健康检查认为NodeManager进程退出,强制重启NodeManager,导致上述问题产生。 解决方法: 调整NodeManager的内存,数据量比较大(1T以上)的情况下,NodeManager的内存至少在4G以上。
  • 问题 Spark执行应用时上报如下类似错误并导致应用结束。 2016-04-20 10:42:00,557 | ERROR | [shuffle-server-2] | Connection to 10-91-8-208/10.18.0.115:57959 has been quiet for 180000 ms while there are outstanding requests. Assuming connection is dead; please adju st spark.network.timeout if this is wrong. | org.apache.spark.network.server.TransportChannelHandler.userEventTriggered(TransportChannelHandler.java:128) 2016-04-20 10:42:00,558 | ERROR | [shuffle-server-2] | Still have 1 requests outstanding when connection from 10-91-8-208/10.18.0.115:57959 is closed | org.apache.spark.network.client.TransportResponseHandl er.channelUnregistered(TransportResponseHandler.java:102) 2016-04-20 10:42:00,562 | WARN | [yarn-scheduler-ask-am-thread-pool-160] | Error sending message [message = DoShuffleClean(application_1459995017785_0108,319)] in 1 attempts | org.apache.spark.Logging$clas s.logWarning(Logging.scala:92) java.io.IOException: Connection from 10-91-8-208/10.18.0.115:57959 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) 2016-04-20 10:42:00,573 | INFO | [dispatcher-event-loop-14] | Starting task 177.0 in stage 1492.0 (TID 1996351, linux-254, PRO CES S_LOCAL, 2106 bytes) | org.apache.spark.Logging$class.logInfo(Logging.scala: 59) 2016-04-20 10:42:00,574 | INFO | [task-result-getter-0] | Finished task 85.0 in stage 1492.0 (TID 1996259) in 191336 ms on linux-254 (106/3000) | org.apache.spark.Logging$class.logInfo(Logging.scala:59) 2016-04-20 10:42:00,811 | ERROR | [Yarn application state monitor] | Yarn application has already exited with state FINISHED! | org.apache.spark.Logging$class.logError(Logging.scala:75)
  • 问题 Spark应用执行过程中,当driver连接RM失败时,会报下面的错误,且较长时间不退出。 16/04/23 15:31:44 INFO RetryInvocationHandler: Exception while invoking getApplicationReport of class ApplicationClientProtocolPBClientImpl over 37 after 1 fail over attempts. Trying to fail over after sleeping for 44160ms. java.net.ConnectException: Call From vm1/192.168.39.30 to vm1:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
  • 回答 在Spark中有个定期线程,通过连接RM监测AM的状态。由于连接RM超时,就会报上面的错误,且一直重试。RM中对重试次数有限制,默认是30次,每次间隔默认为30秒左右,每次重试时都会报上面的错误。超过次数后,driver才会退出。 RM中关于重试相关的配置项如表1所示。 表1 参数说明 参数 描述 默认值 yarn.resourcemanager.connect.max-wait.ms 连接RM的等待时间最大值。 900000 yarn.resourcemanager.connect.retry-interval.ms 重试连接RM的时间频率。 30000 重试次数=yarn.resourcemanager.connect.max-wait.ms/yarn.resourcemanager.connect.retry-interval.ms,即重试次数=连接RM的等待时间最大值/重试连接RM的时间频率。 在Spark客户端机器中,通过修改“conf/yarn-site.xml”文件,添加并配置“yarn.resourcemanager.connect.max-wait.ms”和“yarn.resourcemanager.connect.retry-interval.ms”,这样可以更改重试次数,Spark应用可以提早退出。
  • 回答 停止应用,在Spark的配置文件“spark-defaults.conf”中将配置项“spark.event.listener.logEnable”配置为“true”。并把配置项“spark.eventQueue.size”配置为1000W。如果需要控制打印频率(默认为1000毫秒打印1条日志),请根据需要修改配置项“spark.event.listener.logRate”,该配置项的单位为毫秒。 启动应用,可以发现如下的日志信息(消费者速率、生产者速率、当前队列中的消息数量和队列中消息数量的最大值)。 INFO LiveListenerBus: [SparkListenerBus]:16044 events are consumed in 5000 ms. INFO LiveListenerBus: [SparkListenerBus]:51381 events are produced in 5000 ms, eventQueue still has 86417 events, MaxSize: 171764. 用户可以根据日志信息【队列中消息数量的最大值MaxSize】,在配置文件“spark-defaults.conf”中将配置项“spark.eventQueue.size”配置成合适的队列大小。比如【队列中消息数量的最大值】为250000,那么配置合适的队列大小为300000。
  • 问题 当Driver日志中出现如下的日志时,表示事件队列溢出了。当事件队列溢出时如何配置事件队列的大小? 普通应用 Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. Spark Streaming应用 Dropping StreamingListenerEvent because no remaining room in event queue. This likely means one of the StreamingListeners is too slow and cannot keep up with the rate at which events are being started by the scheduler.
  • 回答 在运行应用程序时,使用Executor参数“--executor-cores 4”,单进程中并行度高导致IO非常繁忙,以至于任务运行缓慢。 16/02/26 10:04:53 INFO TaskSetManager: Finished task 2139.0 in stage 1.0 (TID 151149) in 376455 ms on 10-196-115-2 (694/153378) 单个任务运行时间超过6分钟,从而导致连接超时问题,最终使得任务失败。 将参数中的核数设置为1,“--executor-cores 1”,任务正常完成,单个任务处理时间在合理范围之内(15秒左右)。 16/02/29 02:24:46 INFO TaskSetManager: Finished task 59564.0 in stage 1.0 (TID 208574) in 15088 ms on 10-196-115-6 (59515/153378) 因此,处理这类网络超时任务,可以减少单个Executor的核数来规避该类问题。
  • 问题 在380节点的大集群上,运行29T数据量的HiBench测试套中ScalaSort测试用例,使用以下关键配置(--executor-cores 4)出现如下异常: org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.168.114.12:23242 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:217) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:102) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.114.12:23242 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:91) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: java.net.ConnectException: Connection timed out: /192.168.114.12:23242 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more
  • 回答 使用yarn application -kill applicationID命令后Spark只会停掉任务对应的SparkContext,而不是退出当前进程。如果当前进程中存在其他常驻的线程(类似spark-shell需要不断检测命令输入,Spark Streaming不断在从数据源读取数据),SparkContext被停止并不会终止整个进程。 如果需要退出Driver进程,建议使用kill -9 pid命令手动退出当前Driver。
  • 回答 在yarn-client模式下,Spark的Driver和ApplicationMaster作为两个独立的进程在运行。当Driver完成任务退出时,会通知ApplicationMaster向ResourceManager注销自身,即调用unregister方法。 由于是远程调用,则存在发生网络故障的可能性。当发生网络故障时,ApplicationMaster会使用Yarn客户端的重试机制进行重试。在达到最大重试次数之前网络恢复正常,则ApplicationMaster会正常退出。 如果超过重试次数和重试时长,则ApplicationMaster注销失败,ResourceManager会认为ApplicationMaster异常退出并尝试重新启动ApplicationMaster。新启动的ApplicationMaster在尝试连接已经退出的Driver失败后,会在ResourceManager页面上标记此次Application为FAILED状态。
  • 操作步骤 一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接收器为Streaming中的Kafka数据源接收器,处理器为Streaming。 对Streaming调优,就必须使该三个部件的性能都更优化。 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点: 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。 详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html。 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API: KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。 ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。 DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。 从实现上来看,DirectKafka的性能更优,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html。 处理器调优 Spark Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Spark Streaming之中,例如: 数据序列化 配置内存 设置并行度 使用External Shuffle Service提升性能 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Spark Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Spark Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。
  • 操作步骤 要使用CBO优化,可以按照以下步骤进行优化。 需要先执行特定的SQL语句来收集所需的表和列的统计信息。 SQL命令如下(根据具体情况选择需要执行的SQL命令): 生成表级别统计信息(扫表): ANALYZE TABLE src COMPUTE STATIS TICS 生成sizeInBytes和rowCount。 使用ANALYZE语句收集统计信息时,无法计算非HDFS数据源的表的文件大小。 生成表级别统计信息(不扫表): ANALYZE TABLE src COMPUTE STATISTI CS NOSCAN 只生成sizeInBytes,如果原来已经生成过sizeInBytes和rowCount,而本次生成的sizeInBytes和原来的大小一样,则保留rowCount(如果存在),否则清除rowCount。 生成列级别统计信息: ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS a, b, c 生成列统计信息,为保证一致性,会同步更新表统计信息。目前不支持复杂数据类型(如Seq, Map等)和HiveStringType的统计信息生成。 显示统计信息: DESC FORMATTED src 在Statistics中会显示“xxx bytes, xxx rows”分别表示表级别的统计信息。也可以通过如下命令显示列统计信息: DESC FORMATTED src a 使用限制:当前统计信息收集不支持针对分区表的分区级别的统计信息。 在Spark客户端的“spark-defaults.conf”配置文件中进行表1设置。 表1 参数介绍 参数 描述 默认值 spark.sql.cbo.enabled CBO总开关。 true表示打开, false表示关闭。 要使用该功能,需确保相关表和列的统计信息已经生成。 false spark.sql.cbo.joinReorder.enabled 使用CBO来自动调整连续的inner join的顺序。 true:表示打开 false:表示关闭 要使用该功能,需确保相关表和列的统计信息已经生成,且CBO总开关打开。 false spark.sql.cbo.joinReorder.dp.threshold 使用CBO来自动调整连续inner join的表的个数阈值。 如果超出该阈值,则不会调整join顺序。 12
  • 操作场景 将datasource表的分区消息存储到Metastore中,并在Metastore中对分区消息进行处理。 优化datasource表,支持对表中分区执行增加、删除和修改等语法,从而增加与Hive的兼容性。 支持在查询语句中,把分区裁剪并下压到Metastore上,从而过滤掉不匹配的分区。 示例如下: select count(*) from table where partCol=1; //partCol列为分区列 此时,在物理计划中执行TableScan操作时,只处理分区(partCol=1)对应的数据。
  • 操作步骤 要启动Datasource表优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数介绍 参数 描述 默认值 spark.sql.hive.manageFilesourcePartitions 是否启用Metastore分区管理(包括数据源表和转换的Hive表)。 true:启用Metastore分区管理,即数据源表存储分区在Hive中,并在查询语句中使用Metastore修剪分区。 false:不启用Metastore分区管理。 true spark.sql.hive.metastorePartitionPruning 是否支持将predicate下压到Hive Metastore中。 true:支持,目前仅支持Hive表的predicate下压。 false:不支持 true spark.sql.hive.filesourcePartitionFileCacheSize 启用内存中分区文件元数据的缓存大小。 所有表共享一个可以使用指定的num字节进行文件元数据的缓存。 只有当“spark.sql.hive.manageFilesourcePartitions”配置为“true”时,该配置项才会生效。 250 * 1024 * 1024 spark.sql.hive.convertMetastoreOrc 设置ORC表的处理方式: false:Spark SQL使用Hive SerDe处理ORC表。 true:Spark SQL使用Spark内置的机制处理ORC表。 true
  • 操作步骤 要启动小文件优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数介绍 参数 描述 默认值 spark.sql.files.maxPartitionBytes 在读取文件时,将单个分区打包的最大字节数。 单位:byte。 134217728(即128M) spark.files.openCostInBytes 打开文件的预估成本, 按照同一时间能够扫描的字节数来测量。当一个分区写入多个文件时使用。高估更好,这样小文件分区将比大文件分区更先被调度。 4M
  • 操作场景 Spark SQL表中,经常会存在很多小文件(大小远小于HDFS的块大小),每个小文件默认对应Spark中的一个Partition,即一个Task。在有很多小文件时,Spark会启动很多Task,此时当SQL逻辑中存在Shuffle操作时,会大大增加hash分桶数,严重影响系统性能。 针对小文件很多的场景,DataSource在创建RDD时,先将Table中的split生成PartitionedFile,再将这些PartitionedFile进行合并。即将多个PartitionedFile组成一个partition,从而减少partition数量,避免在Shuffle操作时生成过多的hash分桶,如图1所示。 图1 小文件合并
  • 操作场景 SparkSQL在往动态分区表中插入数据时,分区数越多,单个Task生成的HDFS文件越多,则元数据占用的内存也越多。这就导致程序GC(Gabage Collection)严重,甚至发生OOM(Out of Memory)。 经测试证明:10240个Task,2000个分区,在执行HDFS文件从临时目录rename到目标目录动作前,FileStatus元数据大小约29G。为避免以上问题,可修改SQL语句对数据进行重分区,以减少HDFS文件个数。
  • 操作步骤 在动态分区语句中加入distribute by,by值为分区字段。 示例如下: insert into table store_returns partition (sr_returned_date_sk) select sr_return_time_sk,sr_item_sk,sr_customer_sk,sr_cdemo_sk,sr_hdemo_sk,sr_addr_sk,sr_store_sk,sr_reason_sk,sr_ticket_number,sr_return_quantity,sr_return_amt,sr_return_tax,sr_return_amt_inc_tax,sr_fee,sr_return_ship_cost,sr_refunded_cash,sr_reversed_charge,sr_store_credit,sr_net_loss,sr_returned_date_sk from ${SOURCE}.store_returns distribute by sr_returned_date_sk;
  • 操作步骤 设置JD BCS erver的公平调度策略。 Spark默认使用FIFO(First In First Out)的调度策略,但对于多并发的场景,使用FIFO策略容易导致短任务执行失败。因此在多并发的场景下,需要使用公平调度策略,防止任务执行失败。 在Spark中设置公平调度,具体请参考http://archive.apache.org/dist/spark/docs/3.1.1/job-scheduling.html#scheduling-within-an-application。 在JDBC客户端中设置公平调度。 在BeeLine命令行客户端或者JDBC自定义代码中,执行如下语句,其中PoolName是公平调度的某一个调度池。 SET spark.sql.thriftserver.scheduler.pool=PoolName; 执行相应的SQL命令,Spark任务将会在上面的调度池中运行。 设置BroadCastHashJoin的超时时间。 BroadCastHashJoin有超时参数,一旦超过预设的时间,该查询任务直接失败,在多并发场景下,由于计算任务抢占资源,可能会导致BroadCastHashJoin的Spark任务无法执行,导致超时出现。因此需要在JDBCServer的“spark-defaults.conf”配置文件中调整超时时间。 表1 参数描述 参数 描述 默认值 spark.sql.broadcastTimeout BroadcastHashJoin中广播表的超时时间,当任务并发数较高的时候,可以调高该参数值。 -1(数值类型,实际为五分钟)
  • 操作步骤 可对INSERT...SELECT操作做如下的调优操作。 如果建的是Hive表,将存储类型设为Parquet,从而减少执行INSERT...SELECT语句的时间。 建议使用spark-sql或者在Beeline/JDBCServer模式下使用spark用户来执行INSERT...SELECT操作,避免执行更改文件owner的操作,从而减少执行INSERT...SELECT语句的时间。 在Beeline/JDBCServer模式下,executor的用户跟driver是一致的,driver是JDBCServer服务的一部分,是由spark用户启动的,因此其用户也是spark用户,且当前无法实现在运行时将Beeline端的用户透传到executor,因此使用非spark用户时需要对文件进行更改owner为Beeline端的用户,即实际用户。 如果查询的数据是大量的小文件将会产生大量map操作,从而导致输出存在大量的小文件,在执行重命名文件操作时将会耗费较多时间,此时可以通过设置“spark.sql.files.maxPartitionBytes”与“spark.files.openCostInBytes”来设置一个partiton读取的最大字节,在一个partition中合并多个小文件来减少输出文件数及执行重命名文件操作的时间,从而减少执行INSERT...SELECT语句的时间。 上述优化操作并不能解决全部的性能问题,对于以下场景仍然需要较多时间: 对于动态分区表,如果其分区数非常多,那么也需要执行较长的时间。
  • 配置描述 要启动小文件优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 描述 默认值 spark.sql.files.maxPartitionBytes 在读取文件时,将单个分区打包的最大字节数。 单位:byte。 134217728(即128M) spark.files.openCostInBytes 打开文件的预估成本, 按照同一时间能够扫描的字节数来测量。当一个分区写入多个文件时使用。高估更好,这样小文件分区将比大文件分区更先被调度。 4M
  • 配置场景 Spark SQL的表中,经常会存在很多小文件(大小远小于HDFS块大小),每个小文件默认对应Spark中的一个Partition,也就是一个Task。在很多小文件场景下,Spark会起很多Task。当SQL逻辑中存在Shuffle操作时,会大大增加hash分桶数,严重影响性能。 在小文件场景下,您可以通过如下配置手动指定每个Task的数据量(Split Size),确保不会产生过多的Task,提高性能。 当SQL逻辑中不包含Shuffle操作时,设置此配置项,不会有明显的性能提升。
  • 配置描述 在Spark Driver端的“spark-defaults.conf”配置文件中添加如下表格中的参数。 表1 参数说明 参数 描述 默认值 spark.sql.adaptive.enabled 自适应执行特性的总开关。 注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。集群中DPP特性是默认开启的,因此开启AQE特性的同时,需要将DPP特性关闭。 false spark.sql.optimizer.dynamicPartitionPruning.enabled 动态分区裁剪功能的开关。 true spark.sql.adaptive.skewJoin.enabled 当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能。 true spark.sql.adaptive.skewJoin.skewedPartitionFactor 此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区。 5 spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes 分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes。 256MB spark.sql.adaptive.shuffle.targetPostShuffleInputSize 每个task处理的shuffle数据的最小数据量。单位:Byte。 67108864
  • 配置场景 在Spark SQL多表Join的场景下,会存在关联键严重倾斜的情况,导致Hash分桶后,部分桶中的数据远高于其他分桶。最终导致部分Task过重,运行很慢;其他Task过轻,运行很快。一方面,数据量大Task运行慢,使得计算性能低;另一方面,数据量少的Task在运行完成后,导致很多CPU空闲,造成CPU资源浪费。 通过如下配置项可开启自动进行数据倾斜处理功能,通过将Hash分桶后数据量很大的、且超过数据倾斜阈值的分桶拆散,变成多个task处理一个桶的数据机制,提高CPU资源利用率,提高系统性能。 未产生倾斜的数据,将采用原有方式进行分桶并运行。 使用约束: 只支持两表Join的场景。 不支持FULL OUTER JOIN的数据倾斜处理。 示例:执行下面SQL语句,a表倾斜或b表倾斜都无法触发该优化。 select aid FROM a FULL OUTER JOIN b ON aid=bid; 不支持LEFT OUTER JOIN的右表倾斜处理。 示例:执行下面SQL语句,b表倾斜无法触发该优化。 select aid FROM a LEFT OUTER JOIN b ON aid=bid; 不支持RIGHT OUTER JOIN的左表倾斜处理。 示例:执行下面SQL语句,a表倾斜无法触发该优化。 select aid FROM a RIGHT OUTER JOIN b ON aid=bid;
  • 参考信息 被广播的表执行超时,导致任务结束。 默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。 这种情况下,有两种方式处理: 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。
  • 使用coalesce调整分片的数量 coalesce可以调整分片的数量。coalesce函数有两个参数: coalesce(numPartitions: Int, shuffle: Boolean = false) 当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。 遇到下列场景,可选择使用coalesce算子: 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。 当输入切片个数太大,导致程序无法正常运行时使用。 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。
  • 操作步骤 配置Driver内存。 Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。 您可以根据实际任务数量的多少,为Driver设置一个合适的内存。 将“spark-defaults.conf”中的“spark.driver.memory”配置项设置为合适大小。 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。 配置Executor个数。 每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。 将“spark-defaults.conf”中的“spark.executor.instance”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTANCES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。 配置Executor核数。 每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 示例 在执行spark wordcount计算中。1.6T数据,250个executor。 在默认参数下执行失败,出现Futures timed out和OOM错误。 因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。 当把Driver的内存设置到4g时,应用成功跑完。 使用JDBCServer执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。
  • 操作场景 Spark on Yarn模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对Spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
共100000条