华为云用户手册

  • 使用客户端 在已安装客户端的节点,执行sudo su - omm命令切换用户。执行以下命令切换到客户端目录: cd /opt/client 执行以下命令配置环境变量: source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。 kinit MRS 集群用户 例如,kinit admin。 启用Kerberos认证的MRS集群默认创建“admin”用户账号,用于集群管理员维护集群。 直接执行组件的客户端命令。 例如:使用HDFS客户端命令查看HDFS根目录文件,执行hdfs dfs -ls /。
  • 通过E CS 访问 FusionInsight Manager 登录MRS管理控制台。 在“现有集群”列表中,单击指定的集群名称。 记录集群的“可用区”、“虚拟私有云”、“集群管理页面”、“安全组”。 在管理控制台首页服务列表中选择“弹性云服务器”,进入ECS管理控制台,创建一个新的弹性云服务器。 弹性云服务器的“可用区”、“虚拟私有云”、“安全组”,需要和待访问集群的配置相同。 选择一个Windows系统的公共镜像。例如,选择一个标准镜像“Windows Server 2012 R2 Standard 64bit(40GB)”。 其他配置参数详细信息,请参见购买弹性云服务器。 如果ECS的安全组和Master节点的“默认安全组”不同,用户可以选择以下任一种方法修改配置: 将ECS的安全组修改为Master节点的默认安全组,请参见更改安全组。 在集群Master节点和Core节点的安全组添加两条安全组规则使ECS可以访问集群,“协议”需选择为“TCP”,“端口”需分别选择“28443”和“20009”。请参见创建安全组。 在EIP管理控制台,申请一个弹性IP地址,并与ECS绑定。 登录弹性云服务器。 登录ECS需要Windows系统的账号、密码,弹性IP地址以及配置安全组规则。具体请参见Windows云服务器登录方式。 在Windows的远程桌面中,打开浏览器访问Manager。 Manager访问地址为“集群管理页面”地址。访问时需要输入集群的用户名和密码,例如“admin”用户。 如果使用其他集群用户访问Manager,第一次访问时需要修改密码。新密码需要满足集群当前的用户密码复杂度策略。请咨询管理员。 默认情况下,在登录时输入5次错误密码将锁定用户,需等待5分钟自动解锁。 注销用户退出Manager时移动鼠标到右上角 ,然后单击“注销”。
  • 问题 ZooKeeper客户端刷新TGT失败,无法连接ZooKeeper。报错内容如下: Login: Could not renew TGT due to problem running shell command: '***/kinit -R'; exception was:org.apache.zookeeper.Shell$ExitCodeException: kinit: Ticket expired while renewing credentials
  • 回答 可能原因为IBM的JDK和普通JDK的jaas.conf文件格式不一样。 在使用IBM JDK时,建议使用如下jaas.conf文件模板,其中“useKeytab”中的文件路径必须以“file://”开头,后面为绝对路径。 Client { com.ibm.security.auth.module.Krb5LoginModule required useKeytab="file://D:/install/HbaseClientSample/conf/user.keytab" principal="hbaseuser1" credsType="both"; };
  • 回答 Linux的netcat命令没有与Zookeeper服务器安全通信的选项,所以当启用安全的netty配置时,它不能支持Zookeeper四个字母的命令。 为了避免这个问题,用户可以使用下面的Java API来执行四个字母的命令。 org.apache.zookeeper.client.FourLetterWordMain 例如: String[] args = new String[]{host, port, "stat"}; org.apache.zookeeper.client.FourLetterWordMain.main(args); netcat命令只能用于非安全的netty配置。
  • 回答 在单个父目录中创建大量的znode后,当客户端尝试在单个请求中获取所有子节点时,服务端将无法返回,因为结果将超出可存储在znode上的数据的最大长度。 为了避免这个问题,应该根据客户端应用的实际情况将“jute.maxbuffer”参数配置为一个更高的值。 “jute.maxbuffer”只能设置为Java系统属性,且没有zookeeper前缀。如果要将“jute.maxbuffer”的值设为X,在ZooKeeper客户端或服务端启动时传入以下系统属性:-Djute.maxbuffer=X。 例如,将参数值设置为4MB:-Djute.maxbuffer=0x400000。 表1 配置参数 参数 描述 默认值 jute.maxbuffer 指定可以存储在znode中的数据的最大长度。单位是Byte。默认值为0xfffff,即低于1MB。 说明: 如果更改此选项,则必须在所有服务器和客户端上设置该系统属性,否则将出现问题。 0xfffff
  • 回答 创建大量节点后,follower与leader同步时数据量大,在集群数据同步限定时间内不能完成同步过程,导致超时,各个ZooKeeper Server启动失败。 参考修改集群服务配置参数章节,进入ZooKeeper服务“全部配置”页面。不断尝试调大ZooKeeper配置文件“zoo.cfg”中的“syncLimit”和“initLimit”两参数值,直到ZooKeeperServer正常。 表1 参数说明 参数 描述 默认值 syncLimit follower与leader进行同步的时间间隔(时长为ticket时长的倍数)。如果在该时间范围内leader没响应,连接将不能被建立。 15 initLimit follower连接到leader并与leader同步的时间(时长为ticket时长的倍数)。 15 如果将参数“initLimit”和“syncLimit”的参数值均配置为“300”之后,ZooKeeper server仍然无法恢复,则需确认没有其他应用程序正在kill ZooKeeper。例如,参数值为“300”,ticket时长为2000毫秒,即同步限定时间为300*2000ms=600s。 可能存在以下场景,在ZooKeeper中创建的数据过大,需要大量时间与leader同步,并保存到硬盘。在这个过程中,如果ZooKeeper需要运行很长时间,则需确保没有其他监控应用程序kill ZooKeeper而判断其服务停止。
  • 日志级别 ZooKeeper中提供了如表2所示的日志级别。日志级别优先级从高到低分别是FATAL、ERROR、WARN、INFO、DEBUG。程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表2 日志级别 级别 描述 FATAL FATAL表示当前事件处理出现严重错误信息,可能导致系统崩溃。 ERROR ERROR表示当前事件处理出现错误信息,系统运行出错。 WARN WARN表示当前事件处理存在异常信息,但认为是正常范围,不会导致系统出错。 INFO INFO表示系统及各事件正常运行状态信息。 DEBUG DEBUG表示系统及系统的调试信息。 如果您需要修改日志级别,请执行如下操作: 参考修改集群服务配置参数章节,进入ZooKeeper服务“全部配置”页面。 左边菜单栏中选择所需修改的角色所对应的日志菜单。 选择所需修改的日志级别。 单击“保存”,在弹出窗口中单击“确定”使配置生效。 配置完成后立即生效,不需要重启服务。
  • 操作场景 该操作指导用户对ZooKeeper的znode设置权限。 ZooKeeper通过访问控制列表(ACL)来对znode进行访问控制。ZooKeeper客户端为znode指定ACL,ZooKeeper服务器根据ACL列表判定某个请求znode的客户端是否有对应操作的权限。ACL设置涉及如下四个方面。 查看ZooKeeper中znode的ACL。 增加ZooKeeper中znode的ACL。 修改ZooKeeper中znode的ACL。 删除ZooKeeper中znode的ACL。 ZooKeeper的ACL权限说明: ZooKeeper目前支持create,delete,read,write,admin五种权限,且ZooKeeper对权限的控制是znode级别的,而且不继承,即对父znode设置权限,其子znode不继承父znode的权限。ZooKeeper中znode的默认权限为world:anyone:cdrwa,即任何用户都有所有权限。 ACL有三部分: 第一部分是认证类型,如world指所有认证类型,sasl是kerberos认证类型; 第二部分是账号,如anyone指的是任何人; 第三部分是权限,如cdrwa指的是拥有所有权限。 特别的,由于普通模式启动客户端不需要认证,sasl认证类型的ACL在普通模式下将不能使用。本文所有涉及sasl方式的鉴权操作均是在安全集群中进行。 表1 Zookeeper的五种ACL 权限说明 权限简称 权限详情 创建权限 create(c) 可以在当前znode下创建子znode 删除权限 delete(d) 删除当前的znode 读权限 read(r) 获取当前znode的数据,可以列出当前znode所有的子znodes 写权限 write(w) 向当前znode写数据,写入子znode 管理权限 admin(a) 设置当前znode的权限
  • ZooKeeper常用配置参数 参数入口: 请参考修改集群服务配置参数,进入ZooKeeper“全部配置”页面。在搜索框中输入参数名称。 表1 参数说明 配置参数 说明 默认值 skipACL 是否跳过ZooKeeper节点的权限检查。 no maxClientCnxns ZooKeeper的最大连接数,在连接数多的情况下,建议增加。 2000 LOG _LEVEL 日志级别,在调试的时候,可以改为DEBUG。 INFO acl.compare.shortName 当Znode的ACL权限认证类型为SASL时,是否仅使用principal的用户名部分进行ACL权限认证。 true synclimit Follower与leader进行同步的时间间隔(单位为tick)。如果在指定的时间内leader没响应,连接将不能被建立。 15 tickTime 一次tick的时间(毫秒),它是ZooKeeper使用的基本时间单位,心跳、超时的时间都由它来规定。 4000 ZooKeeper内部时间由参数ticktime和参数synclimit控制,如需调大ZooKeeper内部超时时间,需要调大客户端连接ZooKeeper的超时时间。 父主题: 使用ZooKeeper
  • 回答 通过集群将非ViewFS文件系统配置为ViewFS时,ViewFS中的文件夹的用户权限与默认NameService中的非ViewFS不同。因为目录权限不匹配,所以已提交的MR作业运行失败。 在集群中配置ViewFS的用户,需要检查并校验目录权限。在提交作业之前,应按照默认的NameService文件夹权限更改ViewFS文件夹权限。 下表列出了ViewFS中配置的目录的默认权限结构。如果配置的目录权限与下表不匹配,则必须相应地更改目录权限。 表1 ViewFS中配置的目录的默认权限结构 参数 描述 默认值 默认值及其父目录的默认权限 yarn.nodemanager.remote-app-log-dir 在默认文件系统上(通常是HDFS),指定NM应将日志聚合到哪个目录。 logs 777 yarn.nodemanager.remote-app-log-archive-dir 将日志归档的目录。 - 777 yarn.app.mapreduce.am.staging-dir 提交作业时使用的staging目录。 /tmp/hadoop-yarn/staging 777 mapreduce.jobhistory.intermediate-done-dir MapReduce作业记录历史文件的目录。 ${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate 777 mapreduce.jobhistory.done-dir 由MR JobHistory Server管理的历史文件的目录。 ${yarn.app.mapreduce.am.staging-dir}/history/done 777
  • 回答 在某些情况下,已经观察到诊断消息可能无限增长。由于诊断消息存储在状态存储中,不建议允许诊断消息无限增长。因此,需要有一个属性参数用于设置诊断消息的最大大小。 如果您需要设置“yarn.app.attempt.diagnostics.limit.kc”参数值,具体操作参考修改集群服务配置参数,进入Yarn“全部配置”页面,在搜索框搜索以下参数。 表1 参数描述 参数 描述 默认值 yarn.app.attempt.diagnostics.limit.kc 定义每次应用连接的诊断消息的数据大小,以千字节为单位(字符数*1024)。当使用ZooKeeper来存储应用程序的行为状态时,需要限制诊断消息的大小,以防止YARN拖垮ZooKeeper。如果将“yarn.resourcemanager.state-store.max-completed-applications”设置为一个较大的数值,则需要减小该属性参数的值以限制存储的总数据大小。 64
  • 回答 如果应用程序没有设置标签表达式,那么该应用程序上新增的container/resource将使用其所在队列默认的标签表达式。如果队列没有默认的标签表达式,则将其标签表达设置为“default label”。 当应用程序(app1)提交到队列(Q1)上时,应用程序上新增的container/resource使用队列默认的标签表达式(“label1”)。如果app1正在运行时Q1被删除,则app1被移动到“lost_and_found”队列上。由于“lost_and_found”队列没有标签表达式,其标签表达式设置为“default label”,此时app1上新增的container/resource也将其标签表达式设置为“default label”。当app1被移回正常运行的队列(例如,Q2)时,如果Q2支持调用app1中的所有标签表达式(包含“label1”和“default label”),则app1能正常运行直到结束;如果Q2仅支持调用app1中的部分标签表达式(例如,仅支持调用“default label”),那么app1在运行时,拥有“label1”标签表达式的部分任务的资源请求将无法获得资源,从而被挂起,不能正常运行。 因此当把应用程序从“lost_and_found”队列移动到其他运行正常的队列上时,需要保证目标队列能够调用该应用程序的所有标签表达式。 建议不要删除正在运行应用程序的队列。
  • 回答 正常情况下 ,当一个application的单个task的attempt连续在一个节点上失败3次,那么该application的AppMaster就会将该节点加入黑名单,之后AppMaster就会通知调度器不要继续调度task到该节点,从而避免任务失败。 但是默认情况下,当集群中有33%的节点都被加入黑名单时,调度器会忽略黑名单节点。因此,该黑名单特性在小集群场景下容易失效。比如,集群只有3个节点,当1个节点出现故障,黑名单机制失效,不管task的attempt在同一个节点失败多少次,调度器仍然会将task继续调度到该节点,从而导致application因为task失败达到最大attempt次数(MapReduce默认4次)而失败。 规避手段: 在“客户端安装路径/Yarn/config/yarn-site.xml”文件中修改“yarn.resourcemanager.am-scheduling.node-blacklisting-disable-threshold”参数以百分比的形式配置忽略黑名单节点的阈值。建议根据集群规模,适当增大该参数的值,如3个节点的集群, 建议增大到50%。 Superior调度器的框架设计是基于时间的异步调度,当NodeManager故障后,ResourceManager无法快速的感知到NodeManager已经出了问题(默认10mins),因此在此期间,Superior调度器仍然会向该节点调度task,从而导致任务失败。
  • 回答 当nodeSelectPolicy为SEQUENCE,且第一个连接到RM的NM不可用时,RM会在“yarn.nm.liveness-monitor.expiry-interval-ms”属性中指定的周期内,一直尝试为同一个NM分配任务。 可以通过两种方式来避免上述问题: 使用其他的nodeSelectPolicy,如RANDOM。 参考修改集群服务配置参数,进入Yarn“全部配置”页面。在搜索框搜索以下参数,通过“yarn-site.xml”文件更改以下属性: “yarn.resourcemanager.am-scheduling.node-blacklisting-enabled” = “true”; “yarn.resourcemanager.am-scheduling.node-blacklisting-disable-threshold” = “0.5”。
  • 回答 在YARN中,当一个APP的节点被AM(ApplicationMaster)加入黑名单的数量达到一定比例(默认值为节点总数的33%)时,该AM会自动释放黑名单,从而不会出现由于所有可用节点都被加入黑名单而任务无法获取节点资源的现象。 在资源池场景下,假设该集群上有8个节点,通过NodeLabel特性将集群划分为两个资源池,pool A和pool B,其中pool B包含两个节点。用户提交了一个任务App1到pool B,由于HDFS空间不足,App1运行失败,导致pool B的两个节点都被App1的AM加入了黑名单,根据上述原则,2个节点小于8个节点的33%,所以YARN不会释放黑名单,使得App1一直无法得到资源而保持运行状态,后续即使被加入黑名单的节点恢复,App1也无法得到资源。 由于上述原则不适用于资源池场景,所以目前可通过调整客户端参数(路径为“客户端安装路径/Yarn/config/yarn-site.xml”)“yarn.resourcemanager.am-scheduling.node-blacklisting-disable-threshold”为:(nodes number of pool / total nodes )* 33%解决该问题。
  • 回答 这是RM的使用限制,应用程序运行过程中移动到别的队列,此时RM重启,RM并不会在状态存储中存储新队列的信息。 假设用户提交一个MR任务到叶子队列test11上。当任务运行时,删除叶子队列test11,这时提交队列自动变为lost_and_found队列(找不到队列的任务会被放入lost_and_found队列中),任务暂停运行。要启动该任务,用户将任务移动到叶子队列test21上。在将任务移动到叶子队列test21后,任务继续运行,此时RM重启,重启后显示提交队列为lost_and_found队列,而不是test21队列。 发生上述情况的原因是,任务未完成时,RM状态存储中存储的还是应用程序移动前的队列状态。唯一的解决办法就是等RM重启后,再次移动应用程序,将新的队列状态信息写入状态存储中。
  • 回答 NodeManager有重启恢复机制,详情请参见: MRS 3.2.0之前版本:https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/NodeManager.html#NodeManager_Restart MRS 3.2.0及之后版本:https://hadoop.apache.org/docs/r3.3.1/hadoop-yarn/hadoop-yarn-site/NodeManager.html#NodeManager_Restart 可以参考修改集群服务配置参数,进入Yarn“全部配置”页面。需将NodeManager的“yarn.nodemanager.recovery.enabled”配置项为“true”后才生效,默认为“true”,这样在YARN重启的异常场景时会定时删除多余的本地日志,避免问题的出现。
  • 回答 HDFS_DELEGATION_TOKEN到期的异常是由于token没有更新或者超出了最大生命周期。 在token的最大生命周期内确保下面的参数值大于作业的运行时间。 “dfs.namenode.delegation.token.max-lifetime”=“604800000”(默认是一星期) 参考修改集群服务配置参数,进入HDFS“全部配置”页面,在搜索框搜索该参数。 建议在token的最大生命周期内参数值为多倍小时数。
  • 回答 在JobHistory界面中跳转到某个应用的原生页面时,JobHistory需要回放该应用的Event log,如果应用包含的事件日志较大,则回放时间较长,浏览器需要较长时间的等待。 当前浏览器访问JobHistory原生页面需经过httpd代理,代理的超时时间是10分钟,因此,如果JobHistory在10分钟内无法完成Event log的解析并返回,httpd会主动向浏览器返回Proxy Error信息。
  • 回答 当userB对tableB执行insert操作后,会在外表数据路径下生成新的数据文件,且文件属组是userB,当userA查询tableA时,会读取外表数据目录下的所有的文件,此时会因没有userB生成的文件的读取权限而查询失败。 实际上,不只是查询场景,还有其他场景也会出现问题。例如:inset overwrite操作将会把此目录下的其他表文件也一起复写。 由于Spark SQL当前的实现机制,如果对此种场景添加检查限制,会存在一致性问题和性能问题,因此未对此种场景添加限制,但是用户应避免此种用法,以避免此场景带来的各种问题。
  • 回答 JobHistory服务更新页面上的app时,会根据HDFS上的part文件大小变更与否判断是否刷新首页面的app显示信息。如果文件为第一次查看,则将当前文件大小与0作比较,如果大于0则读取该文件。 分组的情况下,如果执行的app没有job处于执行状态,则part文件为空,即JobHistory服务不会读取该文件,此app也不会显示在JobHistory页面上。但如果part文件大小之后有更新,JobHistory又会显示该app。
  • 问题 在History Server页面中访问某个Spark应用的页面时,发现访问时出错。 查看相应的HistoryServer日志后,发现有“FileNotFound”异常,相关日志如下所示: 2016-11-22 23:58:03,694 | WARN | [qtp55429210-232] | /history/application_1479662594976_0001/stages/stage/ | org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:628) java.io.FileNotFoundException: ${BIGDATA_HOME}/tmp/spark/jobHistoryTemp/blockmgr-5f1f6aca-2303-4290-9845-88fa94d78480/09/temp_shuffle_11f82aaf-e226-46dc-b1f0-002751557694 (No such file or directory)
  • 回答 打开FusionInsight Manager页面,看到Yarn服务的业务IP地址为192网段。 从Yarn的日志看到,Yarn读取的Spark Web UI地址为http://10.120.169.53:23011,是10网段的IP地址。由于192网段的IP和10网段的IP不能互通,所以导致访问Spark Web UI界面失败。 修改方案: 登录10.120.169.53客户端机器,修改/etc/hosts文件,将10.120.169.53更改为相对应的192网段的IP地址。再重新运行Spark应用,这时就可以打开Spark Web UI界面。
  • 回答 Kafka重启成功后应用会按照batch时间把2017/05/11 10:57:00~2017/05/11 10:58:00缺失的RDD补上(如图2所示),尽管UI界面上显示读取的数据个数为“0”,但实际上这部分数据在补的RDD中进行了处理,因此,不存在数据丢失。 Kafka重启时间段的数据处理机制如下。 Spark Streaming应用使用了state函数(例如:updateStateByKey),在Kafka重启成功后,Spark Streaming应用生成2017/05/11 10:58:00 batch任务时,会按照batch时间把2017/05/11 10:57:00~2017/05/11 10:58:00缺失的RDD补上(Kafka重启前Kafka上未读取完的数据,属于2017/05/11 10:57:00之前的batch),如图2所示。 图2 重启时间段缺失数据处理机制
  • 问题 在Spark Streaming应用执行过程中重启Kafka时,应用无法从Kafka获取topic offset,从而导致生成Job失败。如图1所示,其中2017/05/11 10:57:00~2017/05/11 10:58:00为Kafka重启时间段。2017/05/11 10:58:00重启成功后对应的“Input Size”的值显示为“0 records”。 图1 Web UI界面部分batch time对应Input Size为0 records
  • 回答 Streaming Context启动时,如果应用设置了checkpoint,则需要对应用中的DStream checkpoint对象进行序列化,序列化时会用到dstream.context。 dstream.context是Streaming Context启动时从output Streams反向查找所依赖的DStream,逐个设置context。如果Spark Streaming应用创建1个输入流,但该输入流无输出逻辑时,则不会给它设置context。所以在序列化时报“NullPointerException”。 解决办法:应用中如果有无输出逻辑的输入流,则在代码中删除该输入流,或添加该输入流的相关输出逻辑。
  • 问题 Spark Streaming应用创建1个输入流,但该输入流无输出逻辑。应用从checkpoint恢复启动失败,报错如下: 17/04/24 10:13:57 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:125) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:515) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:510) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:191) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:186 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:142) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1230) at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:143) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:566) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:612) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:611) at com.spark.test.kafka08LifoTwoInkfk$.main(kafka08LifoTwoInkfk.scala:21) at com.spark.test.kafka08LifoTwoInkfk.main(kafka08LifoTwoInkfk.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:772) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  • 回答 问题原因: ApplicationMaster进程中有1个Credential Refresh Thread会根据token renew周期 * 0.75的时间比例上传更新后的Credential文件到HDFS上。 Executor进程中有1个Credential Refresh Thread会根据token renew周期 *0.8的时间比例去HDFS上获取更新后的Credential文件,用来刷新UserGroupInformation中的token,避免token失效。 当Executor进程的Credential Refresh Thread发现当前时间已经超过Credential文件更新时间(即token renew周期 *0.8)时,会等待1分钟再去HDFS上面获取最新的Credential文件,以确保AM端已经将更新后的Credential文件放到HDFS上。 当“dfs.namenode.delegation.token.renew-interval”配置值小于60秒,Executor进程起来时发现当前时间已经超过Credential文件更新时间,等待1分钟再去HDFS上面获取最新的Credential文件,而此时token已经失效,task运行失败,然后在其他Executor上重试,由于重试时间都是在1分钟内完成,所以task在其他Executor上也运行失败,导致运行失败的Executor加入到黑名单,没有可用的Executor,应用退出。 修改方案: 在Spark使用场景下,需设置“dfs.namenode.delegation.token.renew-interval”大于80秒。“dfs.namenode.delegation.token.renew-interval”参数描述请参表1考。 表1 参数说明 参数 描述 默认值 dfs.namenode.delegation.token.renew-interval 该参数为服务器端参数,设置token renew的时间间隔,单位为毫秒。 86400000
  • 回答 在executor核数等于1的情况下,遵循以下规则对调优Spark Streaming运行参数有所帮助。 Spark任务处理速度和Kafka上partition个数有关,当partition个数小于给定executor个数时,实际使用的executor个数和partition个数相同,其余的将会被空闲。所以应该使得executor个数小于或者等于partition个数。 当Kafka上不同partition数据有倾斜时,数据较多的partition对应的executor将成为数据处理的瓶颈,所以在执行Producer程序时,数据平均发送到每个partition可以提升处理的速度。 在partition数据均匀分布的情况下,同时提高partition和executor个数,将会提升Spark处理速度(当partition个数和executor个数保持一致时,处理速度是最快的)。 在partition数据均匀分布的情况下,尽量保持partition个数是executor个数的整数倍,这样将会使资源得到合理利用。
共100000条