华为云用户手册

  • ExecutorLaucher配置 ExecutorLauncher只有在Yarn-Client模式下才会存在的角色,Yarn-Client模式下,ExecutorLauncher和Driver不在同一个进程中,需要对ExecutorLauncher的参数进行特殊的配置。 表11 参数说明 参数 描述 默认值 spark.yarn.am.extraJavaOptions 在Client模式下传递至YARN Application Master的一系列额外JVM选项。在Cluster模式下使用spark.driver.extraJavaOptions。 参考快速配置Spark参数 spark.yarn.am.memory 针对Client模式下YARN Application Master使用的内存数量,与JVM内存设置字符串格式一致(例如:512m,2g)。在集群模式下,使用spark.driver.memory。 1G spark.yarn.am.memoryOverhead 和“spark.yarn.driver.memoryOverhead”一样,但只针对Client模式下的Application Master。 - spark.yarn.am.cores 针对Client模式下YARN Application Master使用的核数。在Cluster模式下,使用spark.driver.cores。 1
  • WebUI WebUI展示了Spark应用运行的过程和状态。 表13 参数说明 参数 描述 默认值 spark.ui.killEnabled 允许停止Web UI中的stage和相应的job。 说明: 出于安全考虑,将此配置项的默认值设置成false,以避免用户发生误操作。如果需要开启此功能,则可以在spark-defaults.conf配置文件中将此配置项的值设为true。请谨慎操作。 true spark.ui.port 应用程序dashboard的端口,显示内存和工作量数据。 JD BCS erver2x:4040 SparkResource2x:0 IndexServer2x:22901 spark.ui.retainedJobs 在垃圾回收之前Spark UI和状态API记住的job数。 1000 spark.ui.retainedStages 在垃圾回收之前Spark UI和状态API记住的stage数。 1000
  • Executor配置 Executor也是单独一个Java进程,但不像Driver和AM只有一个,Executor可以有多个进程,而目前Spark只支持相同的配置,即所有Executor的进程参数都必然是一样的。 表12 参数说明 参数 描述 默认值 spark.executor.extraJavaOptions 传递至Executor的额外JVM选项。例如,GC设置或其他日志记录。请注意不能通过此选项设置Spark属性或heap大小。Spark属性应该使用SparkConf对象或调用spark-submit脚本时指定的spark-defaults.conf文件来设置。Heap大小可以通过spark.executor.memory来设置。 参考快速配置Spark参数 spark.executor.extraClassPath 附加至Executor classpath的额外的classpath。这主要是为了向后兼容Spark的历史版本。用户一般不用设置此选项。 - spark.executor.extraLibraryPath 设置启动executor JVM时所使用的特殊的library path。 参考快速配置Spark参数 spark.executor.userClassPathFirst (试验性)与spark.driver.userClassPathFirst相同的功能,但应用于Executor实例。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512M,2G)。 4G spark.executorEnv.[EnvironmentVariableName] 添加由EnvironmentVariableName指定的环境变量至executor进程。用户可以指定多个来设置多个环境变量。 - spark.executor.logs.rolling.maxRetainedFiles 设置系统即将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认关闭。 - spark.executor.logs.rolling.size.maxBytes 设置滚动Executor日志的文件的最大值。默认关闭。数值以字节为单位设置。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 - spark.executor.logs.rolling.strategy 设置executor日志的滚动策略。默认滚动关闭。可以设置为“time”(基于时间的滚动)或“size”(基于大小的滚动)。当设置为“time”,使用spark.executor.logs.rolling.time.interval属性的值作为日志滚动的间隔。当设置为“size”,使用spark.executor.logs.rolling.size.maxBytes设置滚动的最大文件大小滚动。 - spark.executor.logs.rolling.time.interval 设置executor日志滚动的时间间隔。默认关闭。合法值为“daily”、“hourly”、“minutely”或任意秒。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 daily
  • Dynamic Allocation 动态资源调度是On Yarn模式特有的特性,并且必须开启Yarn External Shuffle才能使用这个功能。在使用Spark作为一个常驻的服务时候,动态资源调度将大大的提高资源的利用率。例如JDB CS erver服务,大多数时间该进程并不接受JDBC请求,因此将这段空闲时间的资源释放出来,将极大的节约集群的资源。 表5 参数说明 参数 描述 默认值 spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。注意目前仅在YARN模式下有效。 启用动态资源调度必须将spark.shuffle.service.enabled设置为true。以下配置也与此相关:spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors和spark.dynamicAllocation.initialExecutors。 JDBCServer2x: true SparkResource2x: false spark.dynamicAllocation.minExecutors 最小Executor个数。 0 spark.dynamicAllocation.initialExecutors 初始Executor个数。 spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors 最大executor个数。 2048 spark.dynamicAllocation.schedulerBacklogTimeout 调度第一次超时时间。单位为秒。 1s spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 调度第二次及之后超时时间。 1s spark.dynamicAllocation.executorIdleTimeout 普通Executor空闲超时时间。单位为秒。 60 spark.dynamicAllocation.cachedExecutorIdleTimeout 含有cached blocks的Executor空闲超时时间。 JDBCServer2x:2147483647s IndexServer2x:2147483647s SparkResource2x:120
  • 普通Shuffle配置 表9 参数说明 参数 描述 默认值 spark.shuffle.spill 如果设为“true”,通过将数据溢出至磁盘来限制reduce任务期间内存的使用量。 true spark.shuffle.spill.compress 是否压缩shuffle期间溢出的数据。使用spark.io.compression.codec指定的算法进行数据压缩。 true spark.shuffle.file.buffer 每个shuffle文件输出流的内存缓冲区大小(单位:KB)。这些缓冲区可以减少创建中间shuffle文件流过程中产生的磁盘寻道和系统调用次数。也可以通过配置项spark.shuffle.file.buffer.kb设置。 32KB spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.reducer.maxSizeInFlight 从每个reduce任务同时fetch的map任务输出最大值(单位:MB)。由于每个输出要求创建一个缓冲区进行接收,这代表了每个reduce任务固定的内存开销,所以除非拥有大量内存,否则保持低值。也可以通过配置项spark.reducer.maxMbInFlight设置。 48MB
  • Driver配置 Spark Driver可以理解为Spark提交应用的客户端,所有的代码解析工作都在这个进程中完成,因此该进程的参数尤其重要。下面将以如下顺序介绍Spark中进程的参数设置: JavaOptions:Java命令中“-D”后面的参数,可以由System.getProperty获取。 ClassPath:包括Java类和Native的Lib加载路径。 Java Memory and Cores:Java进程的内存和CPU使用量。 Spark Configuration:Spark内部参数,与Java进程无关。 表10 参数说明 参数 描述 默认值 spark.driver.extraJavaOptions 传递至driver(驱动程序)的一系列额外JVM选项。例如,GC设置或其他日志记录。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置Spark参数 spark.driver.extraClassPath 附加至driver的classpath的额外classpath条目。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置Spark参数 spark.driver.userClassPathFirst (试验性)当在驱动程序中加载类时,是否授权用户添加的jar优先于Spark自身的jar。这种特性可用于减缓Spark依赖和用户依赖之间的冲突。目前该特性仍处于试验阶段,仅用于Cluster模式中。 false spark.driver.extraLibraryPath 设置一个特殊的library path在启动驱动程序JVM时使用。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 JDBCServer2x: ${SPARK_INSTALL_HOME}/spark/native SparkResource2x: ${DATA_NODE_INSTALL_HOME}/hadoop/lib/native spark.driver.cores 驱动程序进程使用的核数。仅适用于Cluster模式。 1 spark.driver.memory 驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512M, 2G)。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 4G spark.driver.maxResultSize 对每个Spark action操作(例如“collect”)的所有分区序列化结果的总量限制,至少1M,设置成0表示不限制。如果总量超过该限制,工作任务会中止。限制值设置过高可能会引起驱动程序的内存不足错误(取决于spark.driver.memory和JVM的对象内存开销)。设置合理的限制可以避免驱动程序出现内存不足的错误。 1G spark.driver.host Driver监测的主机名或IP地址,用于Driver与Executor进行通信。 (local hostname) spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 (random)
  • Spark Streaming Spark Streaming是在Spark批处理平台提供的流式数据的处理能力,以“mini-batch”的方式处理从外部输入的数据。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表6 参数说明 参数 描述 默认值 spark.streaming.receiver.writeAheadLog.enable 启用预写日志(WAL)功能。所有通过Receiver接收的输入数据将被保存至预写日志,预写日志可以保证Driver程序出错后数据可以恢复。 false spark.streaming.unpersist 由Spark Streaming产生和保存的RDDs自动从Spark的内存中强制移除。Spark Streaming接收的原始输入数据也将自动清除。设置为false时原始输入数据和存留的RDDs不会自动清除,因此在streaming应用外部依然可以访问,但是这会占用更多的Spark内存。 true
  • Python Spark Python Spark是Spark除了Scala、Java两种API之外的第三种编程语言。不同于Java和Scala都是在JVM平台上运行,Python Spark不仅会有JVM进程,还会有自身的Python进程。以下配置项只适用于Python Spark场景,而其他配置项也同样可以在Python Spark中生效。 表4 参数说明 参数 描述 默认值 spark.python.profile 在Python worker中开启profiling。通过sc.show_profiles()展示分析结果。或者在driver退出前展示分析结果。可以通过sc.dump_profiles(path) 将结果转储到磁盘中。如果一些分析结果已经手动展示,那么在Driver退出前,它们将不会再自动展示。 默认使用pyspark.profiler.BasicProfiler,可以在初始化SparkContext时传入指定的profiler来覆盖默认的profiler。 false spark.python.worker.memory 聚合过程中每个python worker进程所能使用的内存大小,其值格式同指定JVM内存一致,如512m,2g。如果进程在聚集期间所用的内存超过了该值,数据将会被写入磁盘。 512m spark.python.worker.reuse 是否重用python worker。如是,它将使用固定数量的Python workers,那么下一批提交的task将重用这些Python workers,而不是为每个task重新fork一个Python进程。 该功能在大型广播下非常有用,因为此时对下一批提交的task不需要将数据从JVM再一次传输至Python worker。 true
  • Spark Streaming Kafka Receiver是Spark Streaming一个重要的组成部分,它负责接收外部数据,并将数据封装为Block,提供给Streaming消费。最常见的数据源是Kafka,Spark Streaming对Kafka的集成也是最完善的,不仅有可靠性的保障,而且也支持从Kafka直接作为RDD输入。 表7 参数说明 参数 描述 默认值 spark.streaming.kafka.maxRatePerPartition 使用Kafka direct stream API时,从每个Kafka分区读取数据的最大速率(每秒记录数量)。 - spark.streaming.blockInterval 在被存入Spark之前Spark Streaming Receiver接收数据累积成数据块的间隔(毫秒)。推荐最小值为50毫秒。 200ms spark.streaming.receiver.maxRate 每个Receiver接收数据的最大速率(每秒记录数量)。配置设置为0或者负值将不会对速率设限。 - spark.streaming.receiver.writeAheadLog.enable 是否使用ReliableKafkaReceiver。该Receiver支持流式数据不丢失。 false
  • Netty/NIO及Hash/Sort配置 Shuffle是大数据处理中最重要的一个性能点,网络是整个Shuffle过程的性能点。目前Spark支持两种Shuffle方式,一种是Hash,另外一种是Sort。网络也有两种方式,Netty和NIO。 表8 参数说明 参数 描述 默认值 spark.shuffle.manager 处理数据的方式。有两种实现方式可用:sort和hash。sort shuffle对内存的使用率更高,是Spark 1.2及后续版本的默认选项。Spark2.x及后续版本不支持hash。 SORT spark.shuffle.consolidateFiles (仅hash方式)如果要合并在shuffle过程中创建的中间文件,需要将该值设置为“true”。文件创建的少可以提高文件系统处理性能,降低风险。使用ext4或者xfs文件系统时,建议设置为“true”。由于文件系统限制,在ext3上该设置可能会降低8核以上机器的处理性能。 false spark.shuffle.sort.bypassMergeThreshold 该参数只适用于spark.shuffle.manager设置为sort时。在不做map端聚合并且reduce任务的partition数小于或等于该值时,避免对数据进行归并排序,防止系统处理不必要的排序引起性能下降。 200 spark.shuffle.io.maxRetries (仅Netty方式)如果设为非零值,由于IO相关的异常导致的fetch失败会自动重试。该重试逻辑有助于大型shuffle在发生长GC暂停或者网络闪断时保持稳定。 12 spark.shuffle.io.numConnectionsPerPeer (仅Netty方式)为了减少大型集群的连接创建,主机间的连接会被重新使用。对于拥有较多硬盘和少数主机的集群,此操作可能会导致并发性不足以占用所有磁盘,所以用户可以考虑增加此值。 1 spark.shuffle.io.preferDirectBufs (仅Netty方式)使用off-heap缓冲区减少shuffle和高速缓存块转移期间的垃圾回收。对于off-heap内存被严格限制的环境,用户可以将其关闭以强制所有来自Netty的申请使用堆内存。 true spark.shuffle.io.retryWait (仅Netty方式)等待fetch重试期间的时间(秒)。重试引起的最大延迟为maxRetries * retryWait,默认是15秒。 5
  • Spark长时间任务安全认证配置 安全模式下,使用Spark CLI(如spark shell、spark sql、spark submit)时,如果使用kinit命令进行安全认证,当执行长时间运行任务时,会因为认证过期导致任务失败。 在客户端的“spark-defaults.conf”配置文件中设置如下参数,配置完成后,重新执行Spark CLI即可。 当参数值为“true”时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 表3 参数说明 参数名称 含义 默认值 spark.kerberos.principal 具有Spark操作权限的principal。请联系 MRS 集群管理员获取对应principal。 - spark.kerberos.keytab 具有Spark操作权限的Keytab文件名称和文件路径。请联系MRS集群管理员获取对应Keytab文件。 - spark.security.bigdata.loginOnce Principal用户是否只登录一次。true为单次登录;false为多次登录。 单次登录与多次登录的区别在于:Spark社区使用多次Kerberos用户登录多次的方案,但容易出现TGT过期或者Token过期异常导致应用无法长时间运行。MRS修改了Kerberos登录方式,只允许用户登录一次,可以有效的解决过期问题。限制在于,Hive相关的principal与keytab的配置项必须与Spark配置相同。 说明: 当参数值为true时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 true
  • 配置Stage失败重试次数 Spark任务在遇到FetchFailedException时会触发Stage重试。为了防止Stage无限重试,对Stage重试次数进行限制。重试次数可以根据实际需要进行调整。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表1 参数说明 参数 参数说明 默认值 spark.stage.maxConsecutiveAttempts Stage失败重试最大次数。 4
  • 配置是否使用笛卡尔积功能 要启动使用笛卡尔积功能,需要在Spark的“spark-defaults.conf”配置文件中进行如下设置。 表2 笛卡尔积参数说明 参数 说明 默认值 spark.sql.crossJoin.enabled 是否允许隐性执行笛卡尔积。 “true”表示允许 “false”表示不允许,此时只允许query中显式包含CROSS JOIN语法。 true JDBC应用在服务端的“spark-defaults.conf”配置文件中设置该参数。 Spark客户端提交的任务在客户端配的“spark-defaults.conf”配置文件中设置该参数。
  • 前提条件 集群中已安装Doris、HDFS、Yarn、Flink和Kafka等服务。 待连接Doris数据库的节点与MRS集群网络互通。 创建具有Doris管理权限的用户。 集群已启用Kerberos认证(安全模式) 在 FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。 使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。 已安装Flink客户端。
  • 相关表参数配置说明 表1 创建Doris Sink表时可选配置参数 参数名称 参数默认值 是否必须配置 参数描述 doris.request.retries 3 否 向Doris发送请求的重试次数。 doris.request.connect.timeout.ms 30000 否 向Doris发送请求的连接超时时间。 doris.request.read.timeout.ms 30000 否 向Doris发送请求的读取超时时间。 sink.max-retries 3 否 Commit失败后的最大重试次数,默认为3次。 sink.enable.batch-mode false 否 是否使用攒批模式写入Doris,开启后写入时机不依赖CheckPoint,通过“sink.buffer-flush.max-rows”、“sink.buffer-flush.max-bytes”或“sink.buffer-flush.interval”参数来控制写入时机。 sink.buffer-flush.max-rows 50000 否 攒批模式下,单个批次最多写入的数据行数。 sink.buffer-flush.max-bytes 10MB 否 攒批模式下,单个批次最多写入的字节数。 sink.buffer-flush.interval 10s 否 攒批模式下,异步刷新缓存的间隔。 表2 Lookup Join场景下创建Flink表时的可选配置参数 参数名称 参数默认值 是否必须配置 参数描述 lookup.cache.max-rows -1 否 Lookup缓存的最大行数,默认值为:-1,表示不开启缓存。 lookup.cache.ttl 10s 否 Lookup缓存的最大时间,默认值为10s。 lookup.max-retries 1 否 Lookup查询失败后的重试次数。 lookup.jdbc.async false 否 是否开启异步Lookup,默认值为:false。 lookup.jdbc.read.batch.size 128 否 启用异步Lookup时,每次查询的最大批次大小。 lookup.jdbc.read.batch.queue-size 256 否 启用异步Lookup时,中间缓冲队列的大小。 lookup.jdbc.read.thread-size 3 否 每个Task中Lookup的JDBC线程数。
  • ClickHouse数据写入HDFS流程 将ClickHouse数据写入HDFS,参考以下流程。例如写入HDFS的/tmp目录下的secure_ck.txt数据文件: 创建HDFS引擎表: CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS('hdfs://{namenode_ip}:{dfs.namenode.rpc.port}/tmp/secure_ck.txt', 'TSV') 写入HDFS数据文件: INSERT INTO hdfs_engine_table VALUES ('one', 1), ('two', 2), ('three', 3) 查询HDFS数据文件: SELECT * FROM hdfs_engine_table LIMIT 2 结果如下: ┌─name─┬─value─┐ │ one │ 1 │ │ two │ 2 │ └────┴─── ─┘ ClickHouse通过HDFS引擎表写入数据到HDFS时,如果HDFS上数据文件不存在,会生成对应的数据文件。 ClickHouse不支持删除修改和追加写HDFS引擎表数据,只能一次性写入数据。 ClickHouse删除HDFS引擎表以后对HDFS上的数据文件没有影响。
  • 操作步骤 要使用CBO优化,可以按照以下步骤进行优化。 需要先执行特定的SQL语句来收集所需的表和列的统计信息。 SQL命令如下(根据具体情况选择需要执行的SQL命令): 生成表级别统计信息(扫表): ANALYZE TABLE src COMPUTE STATIS TICS 生成sizeInBytes和rowCount。 使用ANALYZE语句收集统计信息时,无法计算非HDFS数据源的表的文件大小。 生成表级别统计信息(不扫表): ANALYZE TABLE src COMPUTE STATISTICS NOSCAN 只生成sizeInBytes,如果原来已经生成过sizeInBytes和rowCount,而本次生成的sizeInBytes和原来的大小一样,则保留rowCount(如果存在),否则清除rowCount。 生成列级别统计信息: ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS a, b, c 生成列统计信息,为保证一致性,会同步更新表统计信息。目前不支持复杂数据类型(如Seq, Map等)和HiveStringType的统计信息生成。 显示统计信息: DESC FORMATTED src 在Statistics中会显示“xxx bytes, xxx rows”分别表示表级别的统计信息。也可以通过如下命令显示列统计信息: DESC FORMATTED src a 使用限制:当前统计信息收集不支持针对分区表的分区级别的统计信息。 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中进行表1设置。 表1 参数介绍 参数 描述 取值示例 spark.sql.cbo.enabled CBO总开关,默认值为false。 true表示打开, false表示关闭。 要使用该功能,需确保相关表和列的统计信息已经生成。 false spark.sql.cbo.joinReorder.enabled 使用CBO来自动调整连续的inner join的顺序。 true:表示打开 false:表示关闭 要使用该功能,需确保相关表和列的统计信息已经生成,且CBO总开关打开。 false spark.sql.cbo.joinReorder.dp.threshold 使用CBO来自动调整连续inner join的表的个数阈值。 如果超出该阈值,则不会调整join顺序。 12
  • 问题 在History Server页面中访问某个Spark应用的页面时,发现访问时出错。 查看相应的HistoryServer日志后,发现有“FileNotFound”异常,相关日志如下所示: 2016-11-22 23:58:03,694 | WARN | [qtp55429210-232] | /history/application_1479662594976_0001/stages/stage/ | org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:628) java.io.FileNotFoundException: ${BIGDATA_HOME}/tmp/spark/jobHistoryTemp/blockmgr-5f1f6aca-2303-4290-9845-88fa94d78480/09/temp_shuffle_11f82aaf-e226-46dc-b1f0-002751557694 (No such file or directory)
  • ClickHouse用户权限说明 ClickHouse用户权限管理实现了对集群中各个ClickHouse实例上用户、角色、权限的统一管理。通过Manager UI的权限管理模块进行创建用户、创建角色、绑定ClickHouse访问权限配置等操作,通过用户绑定角色的方式,实现用户权限控制。 给角色授予某张表的权限之后,删除表时不会清除已经授予角色的该表的权限,重新创建同名表后角色会继承原有该表的权限。如果需要,可以手动清除已经授予角色的表的权限。 例如:创建表table_test,给角色ck_role授予表table_test的读写权限,当删除表table_test后,重新创建同名表table_test,那么ck_role仍然具有表table_test的读写权限。 管理资源:Clickhouse权限管理支持的资源如表1所示。 资源权限:ClickHouse支持的资源权限如表2所示。 表1 ClickHouse支持的权限管理对象 资源列表 是否集成 备注 数据库 是(一级) - 表 是(二级) - 视图 是(二级) 与表一致 表2 资源权限列表 资源对象 可选权限 备注 数据库(DATABASE) CREATE CREATE DATABASE/TABLE/VIEW/DICTIONARY权限 表/视图(TABLE/VIEW) SELECT/INSERT - 父主题: ClickHouse用户权限管理
  • 配置场景 当Spark Streaming应用与Kafka对接,Spark Streaming应用异常终止并从checkpoint恢复重启后,对于进入Kafka数据的任务,系统默认优先处理应用终止前(A段时间)未完成的任务和应用终止到重启完成这段时间内(B段时间)进入Kafka数据生成的任务,最后再处理应用重启完成后(C段时间)进入Kafka数据生成的任务。并且对于B段时间进入Kafka的数据,Spark将按照终止时间(batch时间)生成相应个数的任务,其中第一个任务读取全部数据,其余任务可能不读取数据,造成任务处理压力不均匀。 如果A段时间的任务和B段时间任务处理得较慢,则会影响C段时间任务的处理。针对上述场景,Spark提供Kafka后进先出功能。 图1 Spark Streaming应用重启时间轴 开启此功能后,Spark将优先调度C段时间内的任务,如果存在多个C段任务,则按照任务产生的先后顺序调度执行,再执行A段时间和B段时间的任务。另外,对于B段时间进入Kafka的数据,Spark除了按照终止时间生成相应任务,还将这个期间进入Kafka的所有数据均匀分配到各个任务,避免任务处理压力不均匀。 约束条件: 目前该功能只适用于Spark Streaming中的Direct方式,且执行结果与上一个batch时间处理结果没有依赖关系(即无state操作,如updatestatebykey)。对多条数据输入流,需要相对独立无依赖的状态,否则可能导致数据切分后结果发生变化。 Kafka后进先出功能的开启要求应用只能对接Kafka输入源。 如果提交应用的同时开启Kafka后进先出和流控功能,对于B段时间进入Kafka的数据,将不启动流控功能,以确保读取这些数据的任务调度优先级最低。应用重新启动后C段时间的任务启用流控功能。
  • 配置描述 在Spark Driver端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 参数说明 取值示例 spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false spark.streaming.kafka010.inputstream.class 获取解耦在FusionInsight侧的类。 org.apache.spark.streaming.kafka010.xxDirectKafkaInputDStream
  • 配置场景 Spark优化SQL的执行,一般的优化规则都是启发式的优化规则,启发式的优化规则,仅仅根据逻辑计划本身的特点给出优化,没有考虑数据本身的特点,也就是未考虑算子本身的执行代价。Spark在2.2中引入了基于代价的优化规则(CBO)。CBO会收集表和列的统计信息,结合算子的输入数据集来估计每个算子的输出条数以及字节大小,这些就是执行一个算子的代价。 CBO会调整执行计划,来最小化端到端的查询时间,中心思路2点: 尽早过滤不相关的数据。 最小化每个算子的代价。 CBO优化过程分为2步: 收集统计信息。 根据输入的数据集估算特定算子的输出数据集。 表级别统计信息包括:记录条数;表数据文件的总大小。 列级别统计信息包括:唯一值个数;最大值;最小值;空值个数;平均长度;最大长度;直方图。 有了统计信息后,就可以估计算子的执行代价了。常见的算子包括过滤条件Filter算子和Join算子。 直方图为列统计值的一种,可以直观的描述列数据的分布情况,将列的数据从最小值到最大值划分为事先指定数量的槽位(bin),计算各个槽位的上下界的值,使得全部数据都确定槽位后,所有槽位中的数据数量相同(等高直方图)。有了数据的详细分布后,各个算子的代价估计能更加准确,优化效果更好。 该特性可以通过下面的配置项开启: spark.sql.statistics.histogram.enabled:指定是否开启直方图功能,默认为false。
  • 快速配置常用参数 其他参数在安装集群时已进行了适配,以下参数需要根据使用场景进行调整。以下参数除特别指出外,一般在Spark2x客户端的“spark-defaults.conf”文件中配置。 表1 快速配置常用参数 配置项 说明 取值示例 spark.sql.parquet.compression.codec 对于非分区parquet表,设置其存储文件的压缩格式。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 snappy spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。目前仅在YARN模式下有效。 JDBCServer默认值为true,client默认值为false。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512m,2g)。 4G spark.sql.autoBroadcastJoinThreshold 当进行join操作时,配置广播的最大值。 当SQL语句中涉及的表中相应字段的大小小于该值时,进行广播。 配置为-1时,将不进行广播。 10485760 spark.yarn.queue JDBCServer服务所在的Yarn队列。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 default spark.driver.memory 大集群下推荐配置32~64g驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512m, 2g)。 4G spark.yarn.security.credentials.hbase.enabled 是否打开获取HBase token的功能。如果需要Spark-on-HBase功能,并且配置了安全集群,参数值设置为“true”。否则设置为“false”。 false spark.serializer 用于串行化将通过网络发送或需要缓存的对象的类以序列化形式展现。 Java序列化的默认值适用于任何Serializable Java对象,但运行速度相当慢,所以建议使用org.apache.spark.serializer.KryoSerializer并配置Kryo序列化。可以是org.apache.spark.serializer.Serializer的任何子类。 org.apache.spark.serializer.JavaSerializer spark.executor.cores 每个执行者使用的内核个数。 在独立模式和Mesos粗粒度模式下设置此参数。当有足够多的内核时,允许应用程序在同样的worker上执行多个执行程序;否则,在每个worker上,每个应用程序只能运行一个执行程序。 1 spark.shuffle.service.enabled NodeManager中一个长期运行的辅助服务,用于提升Shuffle计算性能。 fasle spark.sql.adaptive.enabled 是否开启自适应执行框架。 false spark.executor.memoryOverhead 每个执行器要分配的堆内存量(单位为兆字节)。 这是占用虚拟机开销的内存,类似于内部字符串,其他内置开销等等。会随着执行器大小(通常为6-10%)而增长。 1GB spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false
  • 回答 在Spark中有个定期线程,通过连接RM监测AM的状态。由于连接RM超时,就会报上面的错误,且一直重试。RM中对重试次数有限制,默认是30次,每次间隔默认为30秒左右,每次重试时都会报上面的错误。超过次数后,driver才会退出。 RM中关于重试相关的配置项如表1所示。 表1 参数说明 参数 描述 取值示例 yarn.resourcemanager.connect.max-wait.ms 连接RM的等待时间最大值。 900000 yarn.resourcemanager.connect.retry-interval.ms 重试连接RM的时间频率。 30000 重试次数=yarn.resourcemanager.connect.max-wait.ms/yarn.resourcemanager.connect.retry-interval.ms,即重试次数=连接RM的等待时间最大值/重试连接RM的时间频率。 在Spark客户端节点中,通过修改“conf/yarn-site.xml”文件,添加并配置“yarn.resourcemanager.connect.max-wait.ms”和“yarn.resourcemanager.connect.retry-interval.ms”,这样可以更改重试次数,Spark应用可以提早退出。
  • 问题 Spark应用执行过程中,当driver连接RM失败时,会报下面的错误,且较长时间不退出。 16/04/23 15:31:44 INFO RetryInvocationHandler: Exception while invoking getApplicationReport of class ApplicationClientProtocolPBClientImpl over 37 after 1 fail over attempts. Trying to fail over after sleeping for 44160ms. java.net.ConnectException: Call From vm1/192.168.39.30 to vm1:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
  • SparkSQL权限 类似于Hive,SparkSQL也是建立在Hadoop上的 数据仓库 框架,提供类似SQL的结构化数据。 MRS提供用户、用户组和角色,集群中的各类权限需要先授予角色,然后将用户或者用户组与角色绑定。用户只有绑定角色或者加入绑定角色的用户组,才能获得权限。 如果当前组件使用了Ranger进行权限控制,须基于Ranger配置相关策略进行权限管理,具体操作可参考添加Spark2x的Ranger访问权限策略。 Spark2x开启或关闭Ranger鉴权后,需要重启Spark2x服务,并重新下载客户端,或刷新客户端配置文件spark/conf/spark-defaults.conf: 开启Ranger鉴权:spark.ranger.plugin.authorization.enable=true 关闭Ranger鉴权:spark.ranger.plugin.authorization.enable=false
  • SparkSQL使用场景及对应权限 用户通过SparkSQL服务创建数据库需要加入Hive组,不需要角色授权。用户在Hive和HDFS中对自己创建的数据库或表拥有完整权限,可直接创建表、查询数据、删除数据、插入数据、更新数据以及授权他人访问表与对应HDFS目录与文件。 如果用户访问别人创建的表或数据库,需要授予权限。所以根据SparkSQL使用场景的不同,用户需要的权限可能也不相同。 表1 SparkSQL使用场景 主要场景 用户需要的权限 使用SparkSQL表、列或数据库 使用其他用户创建的表、列或数据库,不同的场景需要不同的权限,例如: 创建表,需要“创建”。 查询数据,需要“查询”。 插入数据,需要“插入”。 关联使用其他组件 部分场景除了SparkSQL权限,还可能需要组件的权限,例如: 使用Spark on HBase,在SparkSQL中查询HBase表数据,需要设置HBase权限。 在一些特殊SparkSQL使用场景下,需要单独设置其他权限。 表2 SparkSQL授权注意事项 场景 用户需要的权限 创建SparkSQL数据库、表、外表,或者为已经创建的表或外表添加分区,且Hive用户指定数据文件保存在“/user/hive/warehouse”以外的HDFS目录。 需要此目录已经存在,客户端用户是目录的属主,且用户对目录拥有“读”、“写”和“执行”权限。同时用户对此目录上层的每一级目录都拥有“读”和“执行”权限。 在Spark2x中,在创建HBase的外表时,需要拥有Hive端database的“创建”权限。而在Spark 1.5中,在创建HBase的外表时,需要拥有Hive端database的“创建”权限,也需要拥有HBase端Namespace的“创建”权限。 用户使用load将指定目录下所有文件或者指定文件,导入数据到表中。 数据源为Linux本地磁盘,指定目录时需要此目录已经存在,系统用户“omm”对此目录以及此目录上层的每一级目录拥有“r”和“x”的权限。指定文件时需要此文件已经存在,“omm”对此文件拥有“r”的权限,同时对此文件上层的每一级目录拥有“r”和“x”的权限。 数据源为HDFS,指定目录时需要此目录已经存在,SparkSQL用户是目录属主,且用户对此目录及其子目录拥有“读”、“写”和“执行”权限,并且其上层的每一级目录拥有“读”和“执行”权限。指定文件时需要此文件已经存在,SparkSQL用户是文件属主,且用户对文件拥有“读”、“写”和“执行”权限,同时对此文件上层的每一级目录拥有“读”和“执行”权限。 创建函数、删除函数或者修改任意数据库。 需要授予“管理”权限。 操作Hive中所有的数据库和表。 需加入到supergroup用户组,并且授予“管理”权限。 对部分datasource表赋予insert权限后,执行insert|analyze操作前需要单独对hdfs上的表目录赋予写权限。 当前对spark datasource表赋予Insert权限时,如果表格式为:text|csv|json|parquet|orc,则不会修改表目录的权限。因此,对以上几种类型的datasource表赋予Insert权限后,还需要单独对hdfs上的表目录赋予写权限,用户才能成功对表执行insert|analyze操作。
  • SparkSQL权限模型 用户使用SparkSQL服务进行SQL操作,必须对SparkSQL数据库和表(含外表和视图)拥有相应的权限。完整的SparkSQL权限模型由元数据权限与HDFS文件权限组成。使用数据库或表时所需要的各种权限都是SparkSQL权限模型中的一种。 元数据权限 元数据权限即在元数据层上进行权限控制,与传统关系型数据库类似,SparkSQL数据库包含“创建”和“查询”权限,表和列包含“查询”、“插入”、“UPDATE”和“删除”权限。SparkSQL中还包含拥有者权限“OWNERSHIP”和Spark管理员权限“管理”。 数据文件权限,即HDFS文件权限 SparkSQL的数据库、表对应的文件保存在HDFS中。默认创建的数据库或表保存在HDFS目录“/user/hive/warehouse”。系统自动以数据库名称和数据库中表的名称创建子目录。访问数据库或者表,需要在HDFS中拥有对应文件的权限,包含“读”、“写”和“执行”权限。 用户对SparkSQL数据库或表执行不同操作时,需要关联不同的元数据权限与HDFS文件权限。例如,对SparkSQL数据表执行查询操作,需要关联元数据权限“查询”,以及HDFS文件权限“读”和“执行”。 使用Manager界面图形化的角色管理功能来管理SparkSQL数据库和表的权限,只需要设置元数据权限,系统会自动关联HDFS文件权限,减少界面操作,提高效率。
  • 问题 Spark执行应用时上报如下类似错误并导致应用结束。 2016-04-20 10:42:00,557 | ERROR | [shuffle-server-2] | Connection to 10-91-8-208/10.18.0.115:57959 has been quiet for 180000 ms while there are outstanding requests. Assuming connection is dead; please adju st spark.network.timeout if this is wrong. | org.apache.spark.network.server.TransportChannelHandler.userEventTriggered(TransportChannelHandler.java:128) 2016-04-20 10:42:00,558 | ERROR | [shuffle-server-2] | Still have 1 requests outstanding when connection from 10-91-8-208/10.18.0.115:57959 is closed | org.apache.spark.network.client.TransportResponseHandl er.channelUnregistered(TransportResponseHandler.java:102) 2016-04-20 10:42:00,562 | WARN | [yarn-scheduler-ask-am-thread-pool-160] | Error sending message [message = DoShuffleClean(application_1459995017785_0108,319)] in 1 attempts | org.apache.spark.Logging$clas s.logWarning(Logging.scala:92) java.io.IOException: Connection from 10-91-8-208/10.18.0.115:57959 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) 2016-04-20 10:42:00,573 | INFO | [dispatcher-event-loop-14] | Starting task 177.0 in stage 1492.0 (TID 1996351, linux-254, PRO CES S_LOCAL, 2106 bytes) | org.apache.spark.Logging$class.logInfo(Logging.scala: 59) 2016-04-20 10:42:00,574 | INFO | [task-result-getter-0] | Finished task 85.0 in stage 1492.0 (TID 1996259) in 191336 ms on linux-254 (106/3000) | org.apache.spark.Logging$class.logInfo(Logging.scala:59) 2016-04-20 10:42:00,811 | ERROR | [Yarn application state monitor] | Yarn application has already exited with state FINISHED! | org.apache.spark.Logging$class.logError(Logging.scala:75)
  • 操作场景 将datasource表的分区消息存储到Metastore中,并在Metastore中对分区消息进行处理。 优化datasource表,支持对表中分区执行增加、删除和修改等语法,从而增加与Hive的兼容性。 支持在查询语句中,把分区裁剪并下压到Metastore上,从而过滤掉不匹配的分区。 示例如下: select count(*) from table where partCol=1; //partCol列为分区列 此时,在物理计划中执行TableScan操作时,只处理分区(partCol=1)对应的数据。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全