云服务器内容精选
-
参考信息 被广播的表执行超时,导致任务结束。 默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。 这种情况下,有两种方式处理: 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。
-
解决方法 应用无法访问到SparkUI的IP:PORT。可能有以下原因: 可能原因一:集群节点与客户端节点网络不通。 解决方法: 查看客户端节点“/etc/hosts”文件中是否配置集群节点映射,在客户端节点执行命令: ping sparkui的IP 如果ping不同,检查映射配置与网络设置。 可能原因二:客户端节点防火墙未关闭。 解决方法: 执行如下命令可查看是否关闭: systemctl status firewalld(不同的操作系统查询命令不一致,此命令以CentOS为例) 如下图所示:dead表示关闭。 防火墙开则影响通信,执行如下命令关闭防火墙: service firewalld stop(不同的操作系统查询命令不一致,此命令以CentOS为例) 可能原因三:端口被占用,每一个Spark任务都会占用一个SparkUI端口,默认为22600,如果被占用则依次递增端口重试。但是有个默认重试次数,为16次。16次重试都失败后,会放弃该任务的运行。 查看端口是否被占用: ssh -v -p port username@ip 如果输出“Connection established”,则表示连接成功,端口已被占用。 Spark UI端口范围由配置文件spark-defaults.conf中的参数“spark.random.port.min”和“spark.random.port.max”决定,如果该范围端口都已被占用,则 导致无端口可用从而连接失败。 解决方法:调节重连次数spark.port.maxRetries=50,并且调节executor随机端口范围spark.random.port.max+100 可能原因四:客户端Spark配置参数错误。 解决方法: 在客户端节点执行命令cat spark-env.sh,查看SPARK_LOCAL_HOSTNAME,是否为本机IP。 该问题容易出现在从其他节点直接复制客户端时,配置参数未修改。 需修改SPARK_LOCAL_HOSTNAME为本机IP。 注:如果集群使用EIP通信,则需要设置以下参数。 spark-default.conf中添加spark.driver.host = EIP(客户端节点弹性公网IP) spark-default.conf中添加spark.driver.bindAddress=本地IP spark-env.sh中修改SPARK_LOCAL_HOSTNAME=EIP(客户端节点弹性公网IP) 可能原因五:代码问题。 解决方法: Spark在启动任务时会在客户端创建sparkDriverEnv并绑定DRIVER_BIND_ADDRESS,该逻辑并没有走到服务端,所以该问题产生的原因也是客户端节点操作系统环境问题导致sparkDriver获取不到对应的主机IP。 可以尝试执行export SPARK_LOCAL_HOSTNAME=172.0.0.1或者设置spark.driver.bindAddress=127.0.0.1,使提交任务driver端可以加载到loopbackAddress,从而规避问题。
-
问题 安装集群外客户端或使用集群外客户端时,有时会出现连接Spark任务端口失败的问题。 异常信息:Failed to bind SparkUi Cannot assign requested address: Service ‘sparkDriver’ failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service ‘sparkDriver’ (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
-
场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下要求: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
-
在对性能要求比较高的场景下,可以使用Kryo优化序列化性能 Spark提供了两种序列化实现: org.apache.spark.serializer.KryoSerializer:性能好,兼容性差 org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好 使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 为什么不默认使用Kryo序列化? Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
-
RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下: 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态; 将RDD的存储级别重新设置为StorageLevel.NONE。
-
在业务情况允许的情况下使用高性能算子 使用reduceByKey/aggregateByKey替代groupByKey。 所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。 使用mapPartitions替代普通map。 mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重! 使用filter之后进行coalesce操作。 通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 使用foreachPartitions替代foreach。 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
-
优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
-
初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; // 入库前去重,如果数据没有重复 该参数无需设置 set hoodie.datasource.write.operation = bulk_insert; // 指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; // 指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数 insert into dsrTable select * from srcTabble
-
Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
-
优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
-
调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
-
参考信息 被广播的表执行超时,导致任务结束。 默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。 这种情况下,有两种方式处理: 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。
-
修改 DLI Livy工具配置文件 上传指定的DLI Livy工具jar资源包到OBS桶路径下。 登录OBS控制台,在指定的OBS桶下创建一个存放Livy工具jar包的资源目录。例如:“obs://bucket/livy/jars/”。 进入3.a中DLI Livy工具所在E CS 服务器的安装目录,获取以下jar包,将获取的jar包上传到1.a创建的OBS桶资源目录下。 例如,当前Livy工具安装路径为“/opt/livy”,则当前需要上传的jar包名称如下: /opt/livy/rsc-jars/livy-api-0.7.2.0107.jar /opt/livy/rsc-jars/livy-rsc-0.7.2.0107.jar /opt/livy/repl_2.11-jars/livy-core_2.11-0.7.2.0107.jar /opt/livy/repl_2.11-jars/livy-repl_2.11-0.7.2.0107.jar 修改DLI Livy工具配置文件。 编辑修改配置文件“ /opt/livy/conf/livy-client.conf”。 vi /opt/livy/conf/livy-client.conf 添加如下内容,并根据注释修改配置项。 #当前ECS的私有IP地址,也可以使用ifconfig命令查询。 livy.rsc.launcher.address = X.X.X.X #当前ECS服务器放通的端口号 livy.rsc.launcher.port.range = 30000~32767 编辑修改配置文件“ /opt/livy/conf/livy.conf”。 vi /opt/livy/conf/livy.conf 添加如下内容。根据注释说明修改具体的配置项。 livy.server.port = 8998 livy.spark.master = yarn livy.server.contextLauncher.custom.class=org.apache.livy.rsc.DliContextLauncher livy.server.batch.custom.class=org.apache.livy.server.batch.DliBatchSession livy.server.interactive.custom.class=org.apache.livy.server.interactive.DliInteractiveSession livy.server.sparkApp.custom.class=org.apache.livy.utils.SparkDliApp livy.server.recovery.mode = recovery livy.server.recovery.state-store = filesystem #以下文件路径请根据情况修改 livy.server.recovery.state-store.url = file:///opt/livy/store/ livy.server.session.timeout-check = true livy.server.session.timeout = 1800s livy.server.session.state-retain.sec = 1800s livy.dli.spark.version = 2.3.2 livy.dli.spark.scala-version = 2.11 # 填入存储livy jar包资源的OBS桶路径。 livy.repl.jars = obs://bucket/livy/jars/livy-core_2.11-0.7.2.0107.jar, obs://bucket/livy/jars/livy-repl_2.11-0.7.2.0107.jar livy.rsc.jars = obs://bucket/livy/jars/livy-api-0.7.2.0107.jar, obs://bucket/livy/jars/livy-rsc-0.7.2.0107.jar 编辑修改配置文件“/opt/livy/conf/spark-defaults.conf”。 vi /opt/livy/conf/spark-defaults.conf 添加如下必选参数内容。配置项参数填写说明,详见表1。 # 以下参数均支持在提交作业时覆盖。 spark.yarn.isPython=true spark.pyspark.python=python3 # 当前参数值为生产环境web地址 spark.dli.user.uiBaseAddress=https://console.huaweicloud.com/dli/web # 队列所在的region。 spark.dli.user.regionName=XXXX # dli endpoint 地址。 spark.dli.user.dliEndPoint=XXXX # 用于指定队列,填写已创建DLI的队列名。 spark.dli.user.queueName=XXXX # 提交作业使用的access key。 spark.dli.user.access.key=XXXX # 提交作业使用的secret key。 spark.dli.user.secret.key=XXXX # 提交作业使用的projectId。 spark.dli.user.projectId=XXXX 表1 spark-defaults.conf必选参数说明 参数名 参数填写说明 spark.dli.user.regionName DLI队列所在的区 域名 。 从地区和终端节点获取,对应“区域”列就是regionName。 spark.dli.user.dliEndPoint DLI队列所在的终端节点。 从地区和终端节点获取,对应的“终端节点(Endpoint)”就是该参数取值。 spark.dli.user.queueName DLI队列名称。 spark.dli.user.access.key 对应用户的访问密钥。该用户需要有Spark作业相关权限,权限说明详见权限管理。 密钥获取方式请参考获取AK/SK。 spark.dli.user.secret.key spark.dli.user.projectId 参考获取项目ID获取项目ID。 以下参数为可选参数,请根据参数说明和实际情况配置。详细参数说明请参考Spark Configuration。 表2 spark-defaults.conf可选参数说明 Spark作业参数 对应Spark批处理参数 备注 spark.dli.user.file file 如果是对接notebook工具场景时不需要设置。 spark.dli.user.className class_name 如果是对接notebook工具场景时不需要设置。 spark.dli.user.scType sc_type 推荐使用livy原生配置。 spark.dli.user.args args 推荐使用livy原生配置。 spark.submit.pyFiles python_files 推荐使用livy原生配置。 spark.files files 推荐使用livy原生配置。 spark.dli.user.modules modules - spark.dli.user.image image 提交作业使用的 自定义镜像 ,仅容器集群支持该参数,默认不设置。 spark.dli.user.autoRecovery auto_recovery - spark.dli.user.maxRetryTimes max_retry_times - spark.dli.user.catalogName catalog_name 访问元数据时,需要将该参数配置为dli。
-
通过DLI Livy工具提交Spark作业到DLI 本示例演示通过curl命令使用DLI Livy工具将Spark作业提交到DLI。 将开发好的Spark作业程序jar包上传到OBS路径下。 例如,本示例上传“spark-examples_2.11-XXXX.jar”到“obs://bucket/path”路径下。 以root用户登录到安装DLI Livy工具的ECS服务器。 执行curl命令通过DLI Livy工具提交Spark作业请求到DLI。 ECS_IP为当前安装DLI Livy工具所在的弹性云服务器的私有IP地址。 curl --location --request POST 'http://ECS_IP:8998/batches' \ --header 'Content-Type: application/json' \ --data '{ "driverMemory": "3G", "driverCores": 1, "executorMemory": "2G", "executorCores": 1, "numExecutors": 1, "args": [ "1000" ], "file": "obs://bucket/path/spark-examples_2.11-XXXX.jar", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.dynamicAllocation.minExecutors": 1, "spark.executor.instances": 1, "spark.dynamicAllocation.initialExecutors": 1, "spark.dynamicAllocation.maxExecutors": 2 } }'
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格