华为云用户手册

  • 问题 Spark Streaming应用创建1个输入流,但该输入流无输出逻辑。应用从checkpoint恢复启动失败,报错如下: 17/04/24 10:13:57 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:125) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:515) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:510) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:191) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:186 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:142) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1230) at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:143) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:566) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:612) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:611) at com.spark.test.kafka08LifoTwoInkfk$.main(kafka08LifoTwoInkfk.scala:21) at com.spark.test.kafka08LifoTwoInkfk.main(kafka08LifoTwoInkfk.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:772) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  • 回答 问题原因: ApplicationMaster进程中有1个Credential Refresh Thread会根据token renew周期 * 0.75的时间比例上传更新后的Credential文件到HDFS上。 Executor进程中有1个Credential Refresh Thread会根据token renew周期 *0.8的时间比例去HDFS上获取更新后的Credential文件,用来刷新UserGroupInformation中的token,避免token失效。 当Executor进程的Credential Refresh Thread发现当前时间已经超过Credential文件更新时间(即token renew周期 *0.8)时,会等待1分钟再去HDFS上面获取最新的Credential文件,以确保AM端已经将更新后的Credential文件放到HDFS上。 当“dfs.namenode.delegation.token.renew-interval”配置值小于60秒,Executor进程起来时发现当前时间已经超过Credential文件更新时间,等待1分钟再去HDFS上面获取最新的Credential文件,而此时token已经失效,task运行失败,然后在其他Executor上重试,由于重试时间都是在1分钟内完成,所以task在其他Executor上也运行失败,导致运行失败的Executor加入到黑名单,没有可用的Executor,应用退出。 修改方案: 在Spark使用场景下,需设置“dfs.namenode.delegation.token.renew-interval”大于80秒。“dfs.namenode.delegation.token.renew-interval”参数描述请参表1考。 表1 参数说明 参数 描述 默认值 dfs.namenode.delegation.token.renew-interval 该参数为服务器端参数,设置token renew的时间间隔,单位为毫秒。 86400000
  • 回答 在executor核数等于1的情况下,遵循以下规则对调优Spark Streaming运行参数有所帮助。 Spark任务处理速度和Kafka上partition个数有关,当partition个数小于给定executor个数时,实际使用的executor个数和partition个数相同,其余的将会被空闲。所以应该使得executor个数小于或者等于partition个数。 当Kafka上不同partition数据有倾斜时,数据较多的partition对应的executor将成为数据处理的瓶颈,所以在执行Producer程序时,数据平均发送到每个partition可以提升处理的速度。 在partition数据均匀分布的情况下,同时提高partition和executor个数,将会提升Spark处理速度(当partition个数和executor个数保持一致时,处理速度是最快的)。 在partition数据均匀分布的情况下,尽量保持partition个数是executor个数的整数倍,这样将会使资源得到合理利用。
  • 问题 运行一个Spark Streaming任务,确认有数据输入后,发现没有任何处理的结果。打开Web界面查看Spark Job执行情况,发现如下图所示:有两个Job一直在等待运行,但一直无法成功运行。 图1 Active Jobs 继续查看已经完成的Job,发现也只有两个,说明Spark Streaming都没有触发数据计算的任务(Spark Streaming默认有两个尝试运行的Job,就是图中两个) 图2 Completed Jobs
  • 回答 经过定位发现,导致这个问题的原因是:Spark Streaming的计算核数少于Receiver的个数,导致部分Receiver启动以后,系统已经没有资源去运行计算任务,导致第一个任务一直在等待,后续任务一直在排队。从现象上看,就是如问题中的图1中所示,会有两个任务一直在等待。 因此,当Web出现两个任务一直在等待的情况,首先检查Spark的核数是否大于Receiver的个数。 Receiver在Spark Streaming中是一个常驻的Spark Job,Receiver对于Spark是一个普通的任务,但它的生命周期和Spark Streaming任务相同,并且占用一个核的计算资源。 在调试和测试等经常使用默认配置的场景下,要时刻注意核数与Receiver个数的关系。
  • 回答 该应用程序中使用了DStream中的print算子来显示结果,该算子会调用RDD中的take算子来实现底层的计算。 Take算子会以Partition为单位多次触发计算。 在该问题中,由于Shuffle操作,导致take算子默认有两个Partition,Spark首先计算第一个Partition,但由于没有数据输入,导致获取结果不足10个,从而触发第二次计算,因此会出现RDD的DAG结构打印两次的现象。 在代码中将print算子修改为foreach(collect),该问题则不会出现。
  • 回答 原因分析: 这是由于Spark2x与Spark1.5存储DataSoure表信息的格式不一致导致的。Spark1.5会将schema信息分成多个part,使用path.park.0作为key进行存储,读取时再将各个part都读取出来,重新拼成完整的信息。而Spark2x直接使用相应的key获取对应的信息。这样在Spark2x中去读取Spark1.5创建的DataSource表时,就无法成功读取到key对应的信息,导致解析DataSource表信息失败。 而在处理Hive格式的表时,Spark2x与Spark1.5的存储方式一致,所以Spark2x可以直接读取Spark1.5创建的表,不存在上述问题。 规避措施: Spark2x可以通过创建外表的方式来创建一张指向Spark1.5表实际数据的表,这样可以实现在Spark2x中读取Spark1.5创建的DataSource表。同时,Spark1.5更新过数据后,Spark2x中访问也能感知到变化 ,反过来一样。这样即可实现Spark2x对Spark1.5创建的DataSource表的访问。
  • 问题 问题一: 用户没有drop function的权限,能够drop成功。具体场景如下: 在 FusionInsight Manager页面上添加user1用户,给予用户admin权限,执行下列操作: set role admin;add jar /home/smartcare-udf-0.0.1-SNAPSHOT.jar;create database db4;use db4;create function f11 as 'com.huaweixxx.smartcare.dac.hive.udf.UDFArrayGreaterEqual';create function f12 as 'com.huaweixxx.smartcare.dac.hive.udf.UDFArrayGreaterEqual'; 修改user1用户,取消admin权限,执行下列操作: drop functiondb4.f11; 结果显示drop成功,如图1所示。 图1 用户没有权限却drop成功结果 问题二: 用户drop function成功,show function的时候,function仍然存在。具体场景如下: 在FusionInsight Manager页面上添加user1用户,给予用户admin权限,进入spark-beeline执行下列操作: set role admin;create database db2;use db2;add jar /home/smartcare-udf-0.0.1-SNAPSHOT.jar;create function f11 as 'com.huaweixxx.smartcare.dac.hive.udf.UDFArrayGreaterEqual';create function f12 as 'com.huaweixxx.smartcare.dac.hive.udf.UDFArrayGreaterEqual'; 退出后再进入spark-beeline执行下列操作: set role admin;use db2;drop function db2.f11; 退出后再进入spark-beeline执行下列操作: use db2;show functions; 结果显示,被drop的function仍然存在,如图2所示。 图2 执行show functions操作后的结果
  • 回答 问题根因: 上述两个问题是由于多主实例模式或者多租户模式下,使用spark-beeline通过add jar的方式创建function,此function在各个JD BCS erver实例之间是不可见的。执行drop function时,如果该session连接的JDB CS erver实例不是创建function的JDBCServer实例,则在该session中找不到该function,而且hive默认将“hive.exec.drop.ignorenonexistent”设置为“true”,即当function不存在时,删除function操作不会报错,这样就表现出了用户没有drop function的权限,执行drop时却没有报错,让用户误以为drop成功;但重新起session时又连到创建function的JDBCServer上,因此执行show function,function仍然存在。该行为是hive的社区行为。 修改方案: 在执行drop function命令之前先执行add jar命令,则该function在有权限的情况下才能drop成功,且drop成功之后不会出现show function仍然存在的现象。
  • 回答 场景一: add jar语句只会将jar加载到当前连接的JDBCServer的jarClassLoader,不同JDBCServer不会共用。JDBCServer重启后会创建新的jarClassLoader,所以需要重新add jar。 添加jar包有两种方式:可以在启动spark-sql的时候添加jar包,如spark-sql --jars /opt/test/two_udfs.jar;也可在spark-sql启动后再添加jar包,如add jar /opt/test/two_udfs.jar。add jar所指定的路径可以是本地路径也可以是HDFS上的路径。
  • 回答 当前在默认配置下,在内存中保留的Job和Stage的UI数据个数为1000个。 当前大集群优化已增加将UI数据溢出到磁盘的优化,其溢出条件是每个Stage中的UI数据大小达到最小阈值5MB。如果每个Stage的task数较小,那么其UI数据大小可能达不到该阈值,从而导致该Stage的UI数据一直缓存在内存中,直到UI数据个数到达保留的上限值(当前默认值为1000个),旧的UI数据才会在内存中被清除。 因此,在将旧的UI数据从内存中清除之前,UI数据会占用大量内存,从而导致执行10T的TPCDS测试套时出现Driver内存不足的现象。 规避措施: 根据业务需要,配置合适的需要保留的Job和Stage的UI数据个数,即配置“spark.ui.retainedJobs”和“spark.ui.retainedStages”参数。详细信息请参考Spark常用配置参数中的表13。 如果需要保留的Job和Stage的UI数据个数较多,可通过配置“spark.driver.memory”参数,适当增大Driver的内存。详细信息请参考Spark常用配置参数中的表10。
  • 回答 在开启钨丝计划(即tungsten功能)后,Spark对于部分执行计划会使用codegen的方式来生成Java代码,但JDK编译时要求Java代码中的每个函数的长度不能超过64KB。当执行一个很复杂的SQL语句时,例如有多层语句嵌套,且单层语句中对字段有大量的逻辑处理(如多层嵌套的case when语句),这种情况下,通过codegen生成的Java代码中函数的大小就可能会超过64KB,从而导致编译失败。 规避措施: 当出现上述问题时,用户可以通过关闭钨丝计划,关闭使用codegen的方式来生成Java代码的功能,从而确保语句的正常执行。即在客户端的“spark-defaults.conf”配置文件中将“spark.sql.codegen.wholeStage”配置为“false”。
  • 问题 当执行一个很复杂的SQL语句时,例如有多层语句嵌套,且单层语句中对字段有大量的逻辑处理(如多层嵌套的case when语句),此时执行该语句会报如下所示的错误日志,该错误表明某个方法的代码超出了64KB。 java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
  • 问题 为什么日期类型的字段作为过滤条件时匹配'2016-6-30'时没有查询结果,匹配'2016-06-30'时有查询结果。 如下图所示:“select count(*)from trxfintrx2012 a where trx_dte_par='2016-6-30'”,其中trx_dte_par为日期类型的字段,当过滤条件为“where trx_dte_par='2016-6-30'”时没有查询结果,当过滤条件为“where trx_dte_par='2016-06-30'”时有查询结果。
  • 回答 当前JDBCServer中存在两个线程池HiveServer2-Handler-Pool和HiveServer2-Background-Pool,其中HiveServer2-Handler-Pool用于处理session连接,HiveServer2-Background-Pool用于处理SQL语句的执行。 当前的健康检查机制是通过新建session连接,并在该session所在的线程中执行健康检查命令HEALTHCHECK来判断Spark JDBCServer的健康状况,因此HiveServer2-Handler-Pool必须保留一个线程,用于处理健康检查的session连接和健康检查命令执行,否则将导致无法建立健康检查的session连接或健康检查命令无法执行,从而认为Spark JDBCServer不健康而被Kill。即如果当前HiveServer2-Handler-Pool的线程池数为100,那么最多支持连接99个session。
  • 问题 执行大数据量的Spark任务(如2T的TPCDS测试套),任务运行成功后,在spark-sql退出时概率性出现RejectedExecutionException的异常栈信息,相关日志如下所示: 16/07/16 10:19:56 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from linux-192/10.1.1.5:59250 is closed java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@5fc1ab rejected from java.util.concurrent.ThreadPoolExecutor@52fa7e19[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 3025]
  • 回答 Spark SQL对用户SQL语句的执行逻辑是:首先解析出语句中包含的表,再获取表的元数据信息,然后对权限进行检查。 当表是parquet表时,元数据信息包括文件的Split信息。Split信息需要调用HDFS的接口去读取,当表包含的文件数量很多时,串行读取Split信息变得缓慢,影响性能。故对此做了优化,当表包含的文件大于一定阈值(即spark.sql.sources.parallelSplitDiscovery.threshold参数值)时,会生成一个Job,利用Executor的并行能力去读取,从而提升执行效率。 由于权限检查在获取表元数据之后,因此当读取的parquet表包含的文件数量很多时,会在报“Missing Privileges”之前,运行一个Job来并行读取元数据信息。
  • 问题 使用默认配置时,16T的文本数据转成4T Parquet数据失败,报如下错误信息。 Job aborted due to stage failure: Task 2866 in stage 11.0 failed 4 times, most recent failure: Lost task 2866.6 in stage 11.0 (TID 54863, linux-161, 2): java.io.IOException: Failed to connect to /10.16.1.11:23124 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:92) 使用的默认配置如表1所示。 表1 参数说明 参数 描述 默认值 spark.sql.shuffle.partitions shuffle操作时,shuffle数据的分块数。 200 spark.shuffle.sasl.timeout shuffle操作时SASL认证的超时时间。单位:秒。 120s spark.shuffle.io.connectionTimeout shuffle操作时连接远程节点的超时时间。单位:秒。 120s spark.network.timeout 所有涉及网络连接操作的超时时间。单位:秒。 360s
  • 回答 这是正常现象。 数据分到哪个partition是通过对key的hashcode取模得到的,不同的hashcode取模后的结果有可能是一样的,那样数据就会被分到相同的partition里面,因此出现有些partition没有数据而有些partition里面有多个key对应的数据。 通过调整“spark.sql.shuffle.partitions”参数值可以调整取模时的基数,改善数据分块不均匀的情况,多次验证发现配置为质数或者奇数效果比较好。 在Driver端的“spark-defaults.conf”配置文件中调整如下参数。 表1 参数说明 参数 描述 默认值 spark.sql.shuffle.partitions shuffle操作时,shuffle数据的分块数。 200
  • 回答 Spark SQL可以将表cache到内存中,并且使用压缩存储来尽量减少内存压力。通过将表cache,查询可以直接从内存中读取数据,从而减少读取磁盘带来的内存开销。 但需要注意的是,被cache的表会占用executor的内存。尽管在Spark SQL采用压缩存储的方式来尽量减少内存开销、缓解GC压力,但当缓存的表较大或者缓存表数量较多时,将不可避免地影响executor的稳定性。 此时的最佳实践是,当不需要将表cache来实现查询加速时,应及时将表进行uncache以释放内存。可以执行命令uncache table table_name来uncache表。 被cache的表也可以在Spark Driver UI的Storage标签里查看。
  • 回答 由于Spark存在一个机制,为了提高性能会缓存Parquet的元数据信息。当通过Hive或其他方式更新了Parquet表时,缓存的元数据信息未更新,导致Spark SQL查询不到新插入的数据。 对于存储类型为Parquet的Hive分区表,在执行插入数据操作后,如果分区信息未改变,则缓存的元数据信息未更新,导致Spark SQL查询不到新插入的数据。 解决措施:在使用Spark SQL查询之前,需执行Refresh操作更新元数据信息。 REFRESH TABLE table_name; table_name为刷新的表名,该表必须存在,否则会出错。 执行查询语句时,即可获取到最新插入的数据。
  • 回答 当前可以通过以下3种方式创建UDF: 在Hive端创建UDF。 通过JDBCServer接口创建UDF。用户可以通过Spark Beeline或者JDBC客户端代码来连接JDBCServer,从而执行SQL命令,创建UDF。 通过spark-sql创建UDF。 删除UDF失败,存在以下两种场景: 在Spark Beeline中,对于其他方式创建的UDF,需要重新启动Spark服务端的JDBCServer后,才能将此类UDF删除成功,否则删除失败。在spark-sql中,对于其他方式创建的UDF,需要重新启动spark-sql后,才能将此类UDF删除成功,否则删除失败。 原因:创建UDF后,Spark服务端的JDBCServer未重启或者spark-sql未重新启动的场景,Spark所在线程的FunctionRegistry对象未保存新创建的UDF,那么删除UDF时就会出现错误。 解决方法:重启Spark服务端的JDBCServer和spark-sql,再删除此类UDF。 在Hive端创建UDF时未在创建语句中指定jar包路径,而是通过add jar命令添加UDF的jar包如add jar /opt/test/two_udfs.jar,这种场景下,在其他服务中删除UDF时就会出现ClassNotfound的错误,从而导致删除失败。 原因:在删除UDF时,会先获取该UDF,此时会去加载该UDF对应的类,由于创建UDF时是通过add jar命令指定jar包路径的,其他服务进程的classpath不存在这些jar包,因此会出现ClassNotfound的错误从而导致删除失败。 解决方法:该方式创建的UDF不支持通过其他方式删除,只能通过与创建时一致的方式删除。
  • 回答 Spark的表管理层次如图1所示,最底层是Spark的临时表,存储着使用DataSource方式的临时表,在这一个层面中没有数据库的概念,因此对于这种类型表,表名在各个数据库中都是可见的。 上层为Hive的MetaStore,该层有了各个DB之分。在每个DB中,又有Hive的临时表与Hive的持久化表,因此在Spark中允许三个层次的同名数据表。 查询的时候,Spark SQL优先查看是否有Spark的临时表,再查找当前DB的Hive临时表,最后查找当前DB的Hive持久化表。 图1 Spark表管理层次 当Session退出时,用户操作相关的临时表将自动删除。建议用户不要手动删除临时表。 删除临时表时,其优先级与查询相同,从高到低为Spark临时表、Hive临时表、Hive持久化表。如果想直接删除Hive表,不删除Spark临时表,您可以直接使用drop table DbName.TableName命令。
  • 回答 在进行rollup和cube操作时,用户通常是基于维度进行分析,需要的是度量的结果,因此不会对维度进行聚合操作。 例如当前有表src(d1, d2, m),那么语句1“select d1, sum(m) from src group by d1, d2 with rollup”就是对维度d1和d2进行上卷操作计算度量m的结果,因此有实际业务意义,而其结果也跟预期是一致的。但语句2“select d1, sum(d1) from src group by d1, d2 with rollup”则从业务上无法解释。当前对于语句2所有聚合(sum/avg/max/min)结果均为0。 只有在rollup和cube操作中对出现在group by中的字段进行聚合结果才是0,非rollup和cube操作其结果跟预期一致。
  • 问题 执行超过50T数据的shuffle过程时,出现部分Executor注册shuffle service超时然后丢失从而导致任务失败的问题。错误日志如下所示: 2016-10-19 01:33:34,030 | WARN | ContainersLauncher #14 | Exception from container-launch with container ID: container_e1452_1476801295027_2003_01_004512 and exit code: 1 | LinuxContainerExecutor.java:397 ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:561) at org.apache.hadoop.util.Shell.run(Shell.java:472) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738) at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:381) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:312) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:88) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Exception from container-launch. | ContainerExecutor.java:300 2016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Container id: container_e1452_1476801295027_2003_01_004512 | ContainerExecutor.java:300 2016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Exit code: 1 | ContainerExecutor.java:300 2016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Stack trace: ExitCodeException exitCode=1: | ContainerExecutor.java:300
  • 回答 由于当前数据量较大,有50T数据导入,超过了shuffle的规格,shuffle负载过高,shuffle service服务处于过载状态,可能无法及时响应Executor的注册请求,从而出现上面的问题。 Executor注册shuffle service的超时时间是5秒,最多重试3次,该参数目前不可配。 建议适当调大task retry次数和Executor失败次数。 在客户端的“spark-defaults.conf”配置文件中配置如下参数。“spark.yarn.max.executor.failures”如果不存在,则手动添加该参数项。 表1 参数说明 参数 描述 默认值 spark.task.maxFailures task retry次数。 4 spark.yarn.max.executor.failures Executor失败次数。 关闭Executor个数动态分配功能的场景即“spark.dynamicAllocation.enabled”参数设为“false”时。 numExecutors * 2, with minimum of 3 Executor失败次数。 开启Executor个数动态分配功能的场景即“spark.dynamicAllocation.enabled”参数设为“true”时。 3
  • 问题 在执行大数据量的Spark任务(如100T的TPCDS测试套)过程中,有时会出现Executor丢失从而导致Stage重试的现象。查看Executor的日志,出现“Executor 532 is lost rpc with driver,but is still alive, going to kill it”所示信息,表明Executor丢失是由于JVM Crash导致的。 JVM的关键Crash错误日志,如下: # # A fatal error has been detected by the Java Runtime Environment: # # Internal Error (sharedRuntime.cpp:834), pid=241075, tid=140476258551552 # fatal error: exception happened outside interpreter, nmethods and vtable stubs at pc 0x00007fcda9eb8eb1
  • 回答 JDBCServer方式使用了ShuffleService功能,Reduce阶段所有的Executor会从NodeManager中获取数据,当数据量达到一个级别(10T级别),会出现NodeManager单点瓶颈(ShuffleService服务在NodeManager进程中),就会出现某些Task获取数据超时,从而出现该问题。 因此,当数据量达到10T级别以上的Spark任务,建议用户关闭ShuffleService功能,即在“Spark-defaults.conf”配置文件中将配置项“spark.shuffle.service.enabled”配置为“false”。
  • 回答 对于Hash shuffle,在shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个reduce分区的数据写到各自的磁盘文件中。 这样带来的问题是如果reduce分区的数量比较大的话,将会产生大量的磁盘文件(比如:该问题中将产生1000000 * 100000 = 10^11个shuffle文件)。如果磁盘文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外由于同时打开的文件句柄数量多,序列化以及压缩等操作需要占用非常大的临时内存空间,对内存的使用和GC带来很大的压力,从而容易造成Executor无法响应Driver。 因此,建议使用Sort shuffle,而不使用Hash shuffle。
  • 回答 动态分区表插入数据的最后一步是读取shuffle文件的数据,再写入到表对应的分区文件中。 当大面积shuffle文件损坏后,会引起大批量task失败,然后进行job重试。重试前Spark会将写表分区文件的句柄关闭,大批量task关闭句柄时HDFS无法及时处理。在task进行下一次重试时,句柄在NameNode端未被及时释放,即会发生"Failed to CREATE_FILE"异常。 这种现象仅会在大面积shuffle文件损坏时发生,出现异常后task会重试,重试耗时在毫秒级,影响较小,可以忽略不计。
共100000条