云服务器内容精选

  • 在业务情况允许的情况下使用高性能算子 使用reduceByKey/aggregateByKey替代groupByKey。 所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。 使用mapPartitions替代普通map。 mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重! 使用filter之后进行coalesce操作。 通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 使用foreachPartitions替代foreach。 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
  • 在对性能要求比较高的场景下,可以使用Kryo优化序列化性能 Spark提供了两种序列化实现: org.apache.spark.serializer.KryoSerializer:性能好,兼容性差 org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好 使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 为什么不默认使用Kryo序列化? Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
  • RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下: 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态; 将RDD的存储级别重新设置为StorageLevel.NONE。
  • Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
  • 优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
  • 初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; // 入库前去重,如果数据没有重复 该参数无需设置 set hoodie.datasource.write.operation = bulk_insert; // 指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; // 指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数 insert into dsrTable select * from srcTabble
  • 优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
  • 调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
  • 参考信息 被广播的表执行超时,导致任务结束。 默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。 这种情况下,有两种方式处理: 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。
  • 通过 DLI Livy工具提交Spark作业到DLI 本示例演示通过curl命令使用DLI Livy工具将Spark作业提交到DLI。 将开发好的Spark作业程序jar包上传到OBS路径下。 例如,本示例上传“spark-examples_2.11-XXXX.jar”到“obs://bucket/path”路径下。 以root用户登录到安装DLI Livy工具的E CS 服务器。 执行curl命令通过DLI Livy工具提交Spark作业请求到DLI。 ECS_IP为当前安装DLI Livy工具所在的弹性云服务器的私有IP地址。 curl --location --request POST 'http://ECS_IP:8998/batches' \ --header 'Content-Type: application/json' \ --data '{ "driverMemory": "3G", "driverCores": 1, "executorMemory": "2G", "executorCores": 1, "numExecutors": 1, "args": [ "1000" ], "file": "obs://bucket/path/spark-examples_2.11-XXXX.jar", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.dynamicAllocation.minExecutors": 1, "spark.executor.instances": 1, "spark.dynamicAllocation.initialExecutors": 1, "spark.dynamicAllocation.maxExecutors": 2 } }'
  • 修改DLI Livy工具配置文件 上传指定的DLI Livy工具jar资源包到OBS桶路径下。 登录OBS控制台,在指定的OBS桶下创建一个存放Livy工具jar包的资源目录。例如:“obs://bucket/livy/jars/”。 进入3.a中DLI Livy工具所在ECS服务器的安装目录,获取以下jar包,将获取的jar包上传到1.a创建的OBS桶资源目录下。 例如,当前Livy工具安装路径为“/opt/livy”,则当前需要上传的jar包名称如下: /opt/livy/rsc-jars/livy-api-0.7.2.0107.jar /opt/livy/rsc-jars/livy-rsc-0.7.2.0107.jar /opt/livy/repl_2.11-jars/livy-core_2.11-0.7.2.0107.jar /opt/livy/repl_2.11-jars/livy-repl_2.11-0.7.2.0107.jar 修改DLI Livy工具配置文件。 编辑修改配置文件“ /opt/livy/conf/livy-client.conf”。 vi /opt/livy/conf/livy-client.conf 添加如下内容,并根据注释修改配置项。 #当前ECS的私有IP地址,也可以使用ifconfig命令查询。 livy.rsc.launcher.address = X.X.X.X #当前ECS服务器放通的端口号 livy.rsc.launcher.port.range = 30000~32767 编辑修改配置文件“ /opt/livy/conf/livy.conf”。 vi /opt/livy/conf/livy.conf 添加如下内容。根据注释说明修改具体的配置项。 livy.server.port = 8998 livy.spark.master = yarn livy.server.contextLauncher.custom.class=org.apache.livy.rsc.DliContextLauncher livy.server.batch.custom.class=org.apache.livy.server.batch.DliBatchSession livy.server.interactive.custom.class=org.apache.livy.server.interactive.DliInteractiveSession livy.server.sparkApp.custom.class=org.apache.livy.utils.SparkDliApp livy.server.recovery.mode = recovery livy.server.recovery.state-store = filesystem #以下文件路径请根据情况修改 livy.server.recovery.state-store.url = file:///opt/livy/store/ livy.server.session.timeout-check = true livy.server.session.timeout = 1800s livy.server.session.state-retain.sec = 1800s livy.dli.spark.version = 2.3.2 livy.dli.spark.scala-version = 2.11 # 填入存储livy jar包资源的OBS桶路径。 livy.repl.jars = obs://bucket/livy/jars/livy-core_2.11-0.7.2.0107.jar, obs://bucket/livy/jars/livy-repl_2.11-0.7.2.0107.jar livy.rsc.jars = obs://bucket/livy/jars/livy-api-0.7.2.0107.jar, obs://bucket/livy/jars/livy-rsc-0.7.2.0107.jar 编辑修改配置文件“/opt/livy/conf/spark-defaults.conf”。 vi /opt/livy/conf/spark-defaults.conf 添加如下必选参数内容。配置项参数填写说明,详见表1。 # 以下参数均支持在提交作业时覆盖。 spark.yarn.isPython=true spark.pyspark.python=python3 # 当前参数值为生产环境web地址 spark.dli.user.uiBaseAddress=https://console.huaweicloud.com/dli/web # 队列所在的region。 spark.dli.user.regionName=XXXX # dli endpoint 地址。 spark.dli.user.dliEndPoint=XXXX # 用于指定队列,填写已创建DLI的队列名。 spark.dli.user.queueName=XXXX # 提交作业使用的access key。 spark.dli.user.access.key=XXXX # 提交作业使用的secret key。 spark.dli.user.secret.key=XXXX # 提交作业使用的projectId。 spark.dli.user.projectId=XXXX 表1 spark-defaults.conf必选参数说明 参数名 参数填写说明 spark.dli.user.regionName DLI队列所在的区 域名 。 从地区和终端节点获取,对应“区域”列就是regionName。 spark.dli.user.dliEndPoint DLI队列所在的终端节点。 从地区和终端节点获取,对应的“终端节点(Endpoint)”就是该参数取值。 spark.dli.user.queueName DLI队列名称。 spark.dli.user.access.key 对应用户的访问密钥。该用户需要有Spark作业相关权限,权限说明详见权限管理。 密钥获取方式请参考获取AK/SK。 spark.dli.user.secret.key spark.dli.user.projectId 参考获取项目ID获取项目ID。 以下参数为可选参数,请根据参数说明和实际情况配置。详细参数说明请参考Spark Configuration。 表2 spark-defaults.conf可选参数说明 Spark作业参数 对应Spark批处理参数 备注 spark.dli.user.file file 如果是对接notebook工具场景时不需要设置。 spark.dli.user.className class_name 如果是对接notebook工具场景时不需要设置。 spark.dli.user.scType sc_type 推荐使用livy原生配置。 spark.dli.user.args args 推荐使用livy原生配置。 spark.submit.pyFiles python_files 推荐使用livy原生配置。 spark.files files 推荐使用livy原生配置。 spark.dli.user.modules modules - spark.dli.user.image image 提交作业使用的 自定义镜像 ,仅容器集群支持该参数,默认不设置。 spark.dli.user.autoRecovery auto_recovery - spark.dli.user.maxRetryTimes max_retry_times - spark.dli.user.catalogName catalog_name 访问元数据时,需要将该参数配置为dli。
  • 准备工作 创建DLI队列。在“队列类型”中选择“通用队列”,即Spark作业的计算资源。具体请参考创建队列。 准备一个linux弹性 云服务器ECS ,用于安装DLI Livy。 ECS需要放通30000至32767端口、8998端口。具体操作请参考添加安全组规则。 ECS需安装Java JDK,JDK版本建议为1.8。配置Java环境变量JAVA_HOME。 查询弹性云服务器ECS详细信息,获取ECS的“私有IP地址”。 使用增强型跨源连接打通DLI队列和Livy实例所在的VPC网络。具体操作可以参考增强型跨源连接。
  • DLI Livy工具下载及安装 本次操作下载的DLI Livy版本为apache-livy-0.7.2.0107-bin.tar.gz,后续版本变化请根据实际情况修改。 单击下载链接,获取DLI Livy工具压缩包。 使用WinSCP工具,将获取的工具压缩包上传到准备好的ECS服务器目录下。 使用root用户登录ECS服务器,执行以下命令安装DLI Livy工具。 执行以下命令创建工具安装路径。 mkdir livy安装路径 例如新建路径/opt/livy:mkdir /opt/livy。后续操作步骤均默认以/opt/livy安装路径演示,请根据实际情况修改。 解压工具压缩包到安装路径。 tar --extract --file apache-livy-0.7.2.0107-bin.tar.gz --directory /opt/livy --strip-components 1 --no-same-owner 执行以下命令修改配置文件名称。 cd /opt/livy/conf mv livy-client.conf.template livy-client.conf mv livy.conf.template livy.conf mv livy-env.sh.template livy-env.sh mv log4j.properties.template log4j.properties mv spark-blacklist.conf.template spark-blacklist.conf touch spark-defaults.conf
  • 前提条件 配置前,请先购买OBS桶或并行文件系统。大数据场景推荐使用并行文件系统,并行文件系统(Parallel File System)是 对象存储服务 (Object Storage Service,OBS)提供的一种经过优化的高性能文件系统,提供毫秒级别访问时延,以及TB/s级别带宽和百万级别的IOPS,能够快速处理高性能计算(HPC)工作负载。 并行文件系统的详细介绍和使用说明,请参见《并行文件系统特性指南》。
  • 处理步骤 检查用户执行命令./bin/spark-submit --class cn.interf.Test --master yarn-client 客户端安装目录/Spark/spark1-1.0-SNAPSHOT.jar;,排查是否引入了非法字符。 如果是,修改非法字符,重新执行命令。 重新执行命令后,发生其他错误,查看该jar包的属主属组信息,发现全为root。 修改jar包的属主属组为omm:wheel,重新执行成功。