云服务器内容精选

  • 快速配置常用参数 其他参数在安装集群时已进行了适配,以下参数需要根据使用场景进行调整。以下参数除特别指出外,一般在Spark2x客户端的“spark-defaults.conf”文件中配置。 表1 快速配置常用参数 配置项 说明 默认值 spark.sql.parquet.compression.codec 对于非分区parquet表,设置其存储文件的压缩格式。 在JD BCS erver服务端的“spark-defaults.conf”配置文件中进行设置。 snappy spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。目前仅在YARN模式下有效。 JDB CS erver默认值为true,client默认值为false。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512m,2g)。 4G spark.sql.autoBroadcastJoinThreshold 当进行join操作时,配置广播的最大值。 当SQL语句中涉及的表中相应字段的大小小于该值时,进行广播。 配置为-1时,将不进行广播。 10485760 spark.yarn.queue JDBCServer服务所在的Yarn队列。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 default spark.driver.memory 大集群下推荐配置32~64g驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512m, 2g)。 4G spark.yarn.security.credentials.hbase.enabled 是否打开获取HBase token的功能。如果需要Spark-on-HBase功能,并且配置了安全集群,参数值设置为“true”。否则设置为“false”。 false spark.serializer 用于串行化将通过网络发送或需要缓存的对象的类以序列化形式展现。 Java序列化的默认值适用于任何Serializable Java对象,但运行速度相当慢,所以建议使用org.apache.spark.serializer.KryoSerializer并配置Kryo序列化。可以是org.apache.spark.serializer.Serializer的任何子类。 org.apache.spark.serializer.JavaSerializer spark.executor.cores 每个执行者使用的内核个数。 在独立模式和Mesos粗粒度模式下设置此参数。当有足够多的内核时,允许应用程序在同样的worker上执行多个执行程序;否则,在每个worker上,每个应用程序只能运行一个执行程序。 1 spark.shuffle.service.enabled NodeManager中一个长期运行的辅助服务,用于提升Shuffle计算性能。 fasle spark.sql.adaptive.enabled 是否开启自适应执行框架。 false spark.executor.memoryOverhead 每个执行器要分配的堆内存量(单位为兆字节)。 这是占用虚拟机开销的内存,类似于内部字符串,其他内置开销等等。会随着执行器大小(通常为6-10%)而增长。 1GB spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false
  • 配置描述 为了使WebUI页面显示日志,需要将聚合日志进行解析和展现。Spark是通过Hadoop的JobHistoryServer来解析聚合日志的,所以您可以通过“spark.jobhistory.address”参数,指定JobHistoryServer页面地址,即可完成解析和展现。 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 此功能依赖Hadoop中的JobHistoryServer服务,所以使用聚合日志之前需要保证JobHistoryServer服务已经运行正常。 如果参数值为空,“AggregatedLogs”页签仍然存在,但是无法通过logs链接查看日志。 只有当App已经running,HDFS上已经有该App的事件日志文件时才能查看到聚合的container日志。 正在运行的任务的日志,用户可以通过“Executors”页面的日志链接进行查看,任务结束后日志会汇聚到HDFS上,“Executors”页面的日志链接就会失效,此时用户可以通过“AggregatedLogs”页面的logs链接查看聚合日志。 表1 参数说明 参数 描述 默认值 spark.jobhistory.address JobHistoryServer页面的地址,格式:http(s)://ip:port/jobhistory。例如,将参数值设置为“https://10.92.115.1:26014/jobhistory”。 默认值为空,表示不能从WebUI查看container聚合日志。 修改参数后,需重启服务使得配置生效。 -
  • 配置场景 当Yarn配置“yarn.log-aggregation-enable”为“true”时,就开启了container日志聚合功能。日志聚合功能是指:当应用在Yarn上执行完成后,NodeManager将本节点中所有container的日志聚合到HDFS中,并删除本地日志。详情请参见配置Container日志聚合功能。 然而,开启container日志聚合功能之后,其日志聚合至HDFS目录中,只能通过获取HDFS文件来查看日志。开源Spark和Yarn服务不支持通过WebUI查看聚合后的日志。 因此,Spark在此基础上进行了功能增强。如图1所示,在HistoryServer页面添加“AggregatedLogs”页签,可以通过“logs”链接查看聚合的日志。 图1 聚合日志显示页面
  • 配置场景 当Spark开启事件日志模式,即设置“spark.eventLog.enabled”为“true”时,就会往配置的一个日志文件中写事件,记录程序的运行过程。当程序运行很久,job很多,task很多时就会造成日志文件很大,如JDBCServer、Spark Streaming程序。 而日志回滚功能是指在写事件日志时,将元数据事件(EnviromentUpdate,BlockManagerAdded,BlockManagerRemoved,UnpersistRDD,ExecutorAdded,ExecutorRemoved,MetricsUpdate,ApplicationStart,ApplicationEnd,LogStart)写入日志文件中,Job事件(StageSubmitted, StageCompleted, TaskResubmit, TaskStart,TaskEnd, TaskGettingResult, JobStart,JobEnd)按文件的大小进行决定是否写入新的日志文件。对于Spark SQL的应用,Job事件还包含ExecutionStart、ExecutionEnd。 Spark中有个HistoryServer服务,其UI页面就是通过读取解析这些日志文件获得的。在启动HistoryServer进程时,内存大小就已经定了。因此当日志文件很大时,加载解析这些文件就可能会造成内存不足,driver gc等问题。 所以为了在小内存模式下能加载较大日志文件,需要对大应用开启日志滚动功能。一般情况下,长时间运行的应用建议打开该功能。
  • 配置场景 当前Spark SQL执行一个查询时需要使用大量的内存,尤其是在做聚合(Aggregate)和关联(Join)操作时,此时如果内存有限的情况下就很容易出现OutOfMemoryError。有限内存下的稳定性就是确保在有限内存下依然能够正确执行相关的查询,而不出现OutOfMemoryError。 有限内存并不意味着内存无限小,它只是在内存不足于放下大于内存可用总量几倍的数据时,通过利用磁盘来做辅助从而确保查询依然稳定执行,但依然有一些数据是必须留在内存的,如在做涉及到Join的查询时,对于当前用于Join的相同key的数据还是需要放在内存中,如果该数据量较大而内存较小依然会出现OutOfMemoryError。 有限内存下的稳定性涉及到3个子功能: ExternalSort 外部排序功能,当执行排序时如果内存不足会将一部分数据溢出到磁盘中。 TungstenAggregate 新Hash聚合功能,默认对数据调用外部排序进行排序,然后再进行聚合,因此内存不足时在排序阶段会将数据溢出到磁盘,在聚合阶段因数据有序,在内存中只保留当前key的聚合结果,使用的内存较小。 SortMergeJoin、SortMergeOuterJoin 基于有序数据的等值连接。该功能默认对数据调用外部排序进行排序,然后再进行等值连接,因此内存不足时在排序阶段会将数据溢出到磁盘,在连接阶段因数据有序,在内存中只保留当前相同key的数据,使用的内存较小。
  • 配置描述 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 表1 参数说明 参数 场景 描述 默认值 spark.sql.tungsten.enabled / 类型为Boolean。 当设置的值等于true时,表示开启tungsten功能,即逻辑计划等同于开启codegeneration,同时物理计划使用对应的tungsten执行计划。 当设置的值等于false时,表示关闭tungsten功能。 true spark.sql.codegen.wholeStage 类型为Boolean。 当设置的值等于true时,表示开启codegeneration功能,即运行时对于某些特定的查询将动态生成各逻辑计划代码。 当设置的值等于false时,表示关闭codegeneration功能,运行时使用当前已有静态代码。 true 开启ExternalSort除配置spark.sql.planner.externalSort=true外,还需配置spark.sql.unsafe.enabled=false或者spark.sql.codegen.wholeStage =false。 如果您需要开启TungstenAggregate,有如下几种方式: 将spark.sql.codegen.wholeStage 和spark.sql.unsafe.enabled的值都设置为true(通过配置文件或命令行方式设置)。 如果spark.sql.codegen.wholeStage 和spark.sql.unsafe.enabled都不为true或者其中一个不为true,只要spark.sql.tungsten.enabled的值设置为true时,TungstenAggregate会开启。
  • Compression 数据压缩是一个以CPU换内存的优化策略,因此当Spark内存严重不足的时候(由于内存计算的特质,这种情况非常常见),使用压缩可以大幅提高性能。目前Spark支持三种压缩算法:snappy,lz4,lzf。Snappy为默认压缩算法,并且调用native方法进行压缩与解压缩,在Yarn模式下需要注意堆外内存对Container进程的影响。 表27 参数说明 参数 描述 默认值 spark.io.compression.codec 用于压缩内部数据的codec,例如RDD分区、广播变量和shuffle输出。默认情况下,Spark支持三种压缩算法:lz4,lzf和snappy。可以使用完全合格的类名称指定算法,例如org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.LZFCompressionCodec及org.apache.spark.io.SnappyCompressionCodec。 lz4 spark.io.compression.lz4.block.size 当使用LZ4压缩算法时LZ4压缩中使用的块大小(字节)。当使用LZ4时降低块大小同样也会降低shuffle内存使用。 32768 spark.io.compression.snappy.block.size 当使用Snappy压缩算法时Snappy压缩中使用的块大小(字节)。当使用Snappy时降低块大小同样也会降低shuffle内存使用。 32768 spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.shuffle.spill.compress 是否压缩在shuffle期间溢出的数据。使用spark.io.compression.codec进行压缩。 true spark.eventLog.compress 设置当spark.eventLog.enabled设置为true时是否压缩记录的事件。 false spark.broadcast.compress 在发送之前是否压缩广播变量。建议压缩。 true spark.rdd.compress 是否压缩序列化的RDD分区(例如StorageLevel.MEMORY_ONLY_SER的分区)。牺牲部分额外CPU的时间可以节省大量空间。 false
  • 安全性 Spark目前支持通过共享密钥认证。可以通过spark.authenticate配置参数配置认证。该参数控制Spark通信协议是否使用共享密钥执行认证。该认证是确保双边都有相同的共享密钥并被允许通信的基本握手。如果共享密钥不同,通信将不被允许。共享密钥通过如下方式创建: 对于YARN部署的Spark,将spark.authenticate配置为真会自动处理生成和分发共享密钥。每个应用程序会独占一个共享密钥。 对于其他类型部署的Spark,应该在每个节点上配置Spark参数spark.authenticate.secret。所有Master/Workers和应用程序都将使用该密钥。 表25 参数说明 参数 描述 默认值 spark.acls.enable 是否开启Spark acls。如果开启,它将检查用户是否有访问和修改job的权限。请注意这要求用户可以被识别。如果用户被识别为无效,检查将不被执行。UI可以使用过滤器认证和设置用户。 true spark.admin.acls 逗号分隔的有权限访问和修改所有Spark job的用户/管理员列表。如果在共享集群上运行并且工作时有 MRS 集群管理员或开发人员帮助调试,可以使用该列表。 admin spark.authenticate 是否Spark认证其内部连接。如果不是运行在YARN上,请参见spark.authenticate.secret。 true spark.authenticate.secret 设置Spark各组件之间验证的密钥。如果不是运行在YARN上且认证未开启,需要设置该项。 - spark.modify.acls 逗号分隔的有权限修改Spark job的用户列表。默认情况下只有开启Spark job的用户才有修改列表的权限(例如删除列表)。 - spark.ui.view.acls 逗号分隔的有权限访问Spark web ui的用户列表。默认情况下只有开启Spark job的用户才有访问权限。 -
  • TIMEOUT Spark默认配置能很好的处理中等数据规模的计算任务,但一旦数据量过大,会经常出现超时导致任务失败的场景。在大数据量场景下,需调大Spark中的超时参数。 表23 参数说明 参数 描述 默认值 spark.files.fetchTimeout 获取通过驱动程序的SparkContext.addFile()添加的文件时的通信超时(秒)。 60s spark.network.timeout 所有网络交互的默认超时(秒)。如未配置,则使用该配置代替spark.core.connection.ack.wait.timeout, spark.akka.timeout, spark.storage.blockManagerSlaveTimeoutMs或spark.shuffle.io.connectionTimeout。 360s spark.core.connection.ack.wait.timeout 连接时应答的超时时间(单位:秒)。为了避免由于GC带来的长时间等待,可以设置更大的值。 60
  • 加密 Spark支持Akka和HTTP(广播和文件服务器)协议的SSL,但WebUI和块转移服务仍不支持SSL。 SSL必须在每个节点上配置,并使用特殊协议为通信涉及到的每个组件进行配置。 表24 参数说明 参数 描述 默认值 spark.ssl.enabled 是否在所有被支持协议上开启SSL连接。 与spark.ssl.xxx类似的所有SSL设置指示了所有被支持协议的全局配置。为了覆盖特殊协议的全局配置,在协议指定的命名空间中必须重写属性。 使用“spark.ssl.YYY.XXX”设置覆盖由YYY指示的特殊协议的全局配置。目前YYY可以是基于Akka连接的akka或广播与文件服务器的fs。 false spark.ssl.enabledAlgorithms 以逗号分隔的密码列表。指定的密码必须被JVM支持。 - spark.ssl.keyPassword key-store的私人密钥密码。 - spark.ssl.keyStore key-store文件的路径。该路径可以绝对或相对于开启组件的目录。 - spark.ssl.keyStorePassword key-store的密码。 - spark.ssl.protocol 协议名。该协议必须被JVM支持。本页所有协议的参考表。 - spark.ssl.trustStore trust-store文件的路径。该路径可以绝对或相对于开启组件的目录。 - spark.ssl.trustStorePassword trust-store的密码。 -
  • 开启Spark进程间的认证机制 目前Spark进程间支持共享密钥方式的认证机制,通过配置spark.authenticate可以控制Spark在通信过程中是否做认证。这种认证方式只是通过简单的握手来确定通信双方享有共同的密钥。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表26 参数说明 参数 描述 默认值 spark.authenticate 在Spark on YARN模式下,将该参数配置成true即可。密钥的生成和分发过程是自动完成的,并且每个应用独占一个密钥。 true
  • PORT 表21 参数说明 参数 描述 默认值 spark.ui.port 应用仪表盘的端口,显示内存和工作负载数据。 JDBCServer2x:4040 SparkResource2x:0 spark.blockManager.port 所有BlockManager监测的端口。这些同时存在于Driver和Executor上。 随机端口范围 spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 随机端口范围
  • EventLog的周期清理 JobHistory上的Event log是随每次任务的提交而累积的,任务提交的次数多了之后会造成太多文件的存放。Spark提供了周期清理Evnet log的功能,用户可以通过配置开关和相应的清理周期参数来进行控制。 表17 参数说明 参数 描述 默认值 spark.history.fs.cleaner.enabled 是否打开清理功能。 true spark.history.fs.cleaner.interval 清理功能的检查周期。 1d spark.history.fs.cleaner.maxAge 日志的最长保留时间。 4d
  • Kryo Kryo是一个非常高效的Java序列化框架,Spark中也默认集成了该框架。几乎所有的Spark性能调优都离不开将Spark默认的序列化器转化为Kryo序列化器的过程。目前Kryo序列化只支持Spark数据层面的序列化,还不支持闭包的序列化。设置Kryo序列元,需要将配置项“spark.serializer”设置为“org.apache.spark.serializer.KryoSerializer”,同时也搭配设置以下的配置项,优化Kryo序列化的性能。 表18 参数说明 参数 描述 默认值 spark.kryo.classesToRegister 使用Kryo序列化时,需要注册到Kryo的类名,多个类之间用逗号分隔。 - spark.kryo.referenceTracking 当使用Kryo序列化数据时,是否跟踪对同一个对象的引用情况。适用于对象图有循环引用或同一对象有多个副本的情况。否则可以设置为关闭以提升性能。 true spark.kryo.registrationRequired 是否需要使用Kryo来注册对象。当设为“true”时,如果序列化一个未使用Kryo注册的对象则会发生异常。当设为“false”(默认值)时,Kryo会将未注册的类名称一同写到序列化对象中。该操作会带来大量性能开销,所以在用户还没有从注册队列中删除相应的类时应该开启该选项。 false spark.kryo.registrator 如果使用Kryo序列化,使用Kryo将该类注册至定制类。如果需要以定制方式注册类,例如指定一个自定义字段序列化器,可使用该属性。否则spark.kryo.classesToRegister会更简单。它应该设置为一个扩展KryoRegistrator的类。 - spark.kryoserializer.buffer.max Kryo序列化缓冲区允许的最大值,单位为兆字节。这个值必须大于尝试序列化的对象。当在Kryo中遇到“buffer limit exceeded”异常时可以适当增大该值。也可以通过配置项spark.kryoserializer.buffer.max配置。 64MB spark.kryoserializer.buffer Kryo序列化缓冲区的初始值,单位为兆字节。每个worker的每个核心都会有一个缓冲区。如果有需要,缓冲区会增大到spark.kryoserializer.buffer.max设置的值。也可以通过配置项spark.kryoserializer.buffer配置。 64KB
  • Broadcast Broadcast用于Spark进程间数据块的传输。Spark中无论Jar包、文件还是闭包以及返回的结果都会使用Broadcast。目前的Broadcast支持两种方式,Torrent与HTTP。前者将会把数据切成小片,分布到集群中,有需要时从远程获取;后者将文件存入到本地磁盘,有需要时通过HTTP方式将整个文件传输到远端。前者稳定性优于后者,因此Torrent为默认的Broadcast方式。 表19 参数说明 参数 描述 默认值 spark.broadcast.factory 使用的广播方式。 org.apache.spark.broadcast.TorrentBroadcastFactory spark.broadcast.blockSize TorrentBroadcastFactory的块大小。该值过大会降低广播时的并行度(速度变慢),过小可能会影响BlockManager的性能。 4096 spark.broadcast.compress 在发送广播变量之前是否压缩。建议压缩。 true