华为云用户手册

  • Channel Selector Channel Selector可以允许一个Source对接多个Channel,通过选择不同的Selector类型来将Source的数据进行分流或者复制,目前Flume提供的Channel Selector有两种:Replicating和Multiplexing。 Replicating:表示Source的数据同步发送给所有Channel。 Multiplexing:表示根据Event中的Header的指定字段的值来进行判断,从而选择相应的Channel进行发送,从而起到根据业务类型进行分流的目的。 Replicating配置样例: client.sources = kafkasourceclient.channels = channel1 channel2client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSourceclient.sources.kafkasource.kafka.topics = topic1,topic2client.sources.kafkasource.kafka.consumer.group.id = flumeclient.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXTclient.sources.kafkasource.batchDurationMillis = 1000client.sources.kafkasource.batchSize = 800client.sources.kafkasource.channels = channel1 channel2client.sources.kafkasource.selector.type = replicatingclient.sources.kafkasource.selector.optional = channel2 表1 Replicating配置样例参数说明 选项名称 默认值 描述 Selector.type replicating Selector类型,应配置为replicating Selector.optional - 可选Channel,可以配置为列表 Multiplexing配置样例: client.sources = kafkasourceclient.channels = channel1 channel2client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSourceclient.sources.kafkasource.kafka.topics = topic1,topic2client.sources.kafkasource.kafka.consumer.group.id = flumeclient.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXTclient.sources.kafkasource.batchDurationMillis = 1000client.sources.kafkasource.batchSize = 800client.sources.kafkasource.channels = channel1 channel2client.sources.kafkasource.selector.type = multiplexingclient.sources.kafkasource.selector.header = myheaderclient.sources.kafkasource.selector.mapping.topic1 = channel1client.sources.kafkasource.selector.mapping.topic2 = channel2client.sources.kafkasource.selector.default = channel1 表2 Multiplexing配置样例参数说明 选项名称 默认值 描述 Selector.type replicating Selector类型,应配置为multiplexing Selector.header Flume.selector.header - Selector.default - - Selector.mapping.* - - Multiplexing类型的Selector的样例中,选择Event中Header名称为topic的字段来进行判断,当Header中topic字段的值为topic1时,向channel1发送该Event,当Header中topic字段的值为topic2时,向channel2发送该Event。 这种Selector需要借助Source中Event的特定Header来进行Channel的选择,需要根据业务场景选择合理的Header来进行数据分流。
  • 参数调优 修改服务配置参数,请参考修改集群服务配置参数。调优参数请参考表1。 表1 调优参数 配置参数 缺省值 调优场景 num.recovery.threads.per.data.dir 10 在Kafka启动过程中,数据量较大情况下,可调大此参数,可以提升启动速度。 background.threads 10 Broker后台任务处理的线程数目。数据量较大的情况下,可适当调大此参数,以提升Broker处理能力。 num.replica.fetchers 1 副本向Leader请求同步数据的线程数,增大这个数值会增加副本的I/O并发度。 num.io.threads 8 Broker用来处理磁盘I/O的线程数目,这个线程数目建议至少等于硬盘的个数。 KAFKA_HEAP_OPTS -Xmx6G -Xms6G Kafka JVM堆内存设置。当Broker上数据量较大时,应适当调整堆内存大小。
  • 注意事项 Join数据倾斜问题 执行任务的时候,任务进度长时间维持在99%,这种现象叫数据倾斜。 数据倾斜是经常存在的,因为有少量的Reduce任务分配到的数据量和其他Reduce差异过大,导致大部分Reduce都已完成任务,但少量Reduce任务还没完成的情况。 解决数据倾斜的问题,可通过设置“set hive.optimize.skewjoin=true”并调整hive.skewjoin.key的大小。hive.skewjoin.key是指Reduce端接收到多少个key即认为数据是倾斜的,并自动分发到多个Reduce。
  • Map Join Hive的Map Join适用于能够在内存中存放下的小表(指表大小小于25MB),通过“hive.mapjoin.smalltable.filesize”定义小表的大小,默认为25MB。 Map Join的方法有两种: 使用/*+ MAPJOIN(join_table) */。 执行语句前设置如下参数,当前版本中该值默认为true。 set hive.auto.convert.join=true; 使用Map Join时没有Reduce任务,而是在Map任务前起了一个MapReduce Local Task,这个Task通过TableScan读取小表内容到本机,在本机以HashTable的形式保存并写入硬盘上传到DFS,并在distributed cache中保存,在Map Task中从本地磁盘或者distributed cache中读取小表内容直接与大表join得到结果并输出。 使用Map Join时需要注意小表不能过大,如果小表将内存基本用尽,会使整个系统性能下降甚至出现内存溢出的异常。
  • Sort Merge Bucket Map Join 使用Sort Merge Bucket Map Join必须满足以下2个条件: join的两张表都很大,内存中无法存放。 两张表都按照join key进行分桶(clustered by (column))和排序(sorted by(column)),且两张表的分桶数正好是倍数关系。 通过如下设置,启用Sort Merge Bucket Map Join: set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; 这种Map Join也没有Reduce任务,是在Map任务前启动MapReduce Local Task,将小表内容按桶读取到本地,在本机保存多个桶的HashTable备份并写入HDFS,并保存在Distributed Cache中,在Map Task中从本地磁盘或者Distributed Cache中按桶一个一个读取小表内容,然后与大表做匹配直接得到结果并输出。
  • 注意事项 Group By数据倾斜 Group By也同样存在数据倾斜的问题,设置hive.groupby.skewindata为true,生成的查询计划会有两个MapReduce Job,第一个Job的Map输出结果会随机的分布到Reduce中,每个Reduce做聚合操作,并输出结果,这样的处理会使相同的Group By Key可能被分发到不同的Reduce中,从而达到负载均衡,第二个Job再根据预处理的结果按照Group By Key分发到Reduce中完成最终的聚合操作。 Count Distinct聚合问题 当使用聚合函数count distinct完成去重计数时,处理值为空的情况会使Reduce产生很严重的数据倾斜,可以将空值单独处理,如果是计算count distinct,可以通过where子句将该值排除掉,并在最后的count distinct结果中加1。如果还有其他计算,可以先将值为空的记录单独处理,再和其他计算结果合并。
  • 操作场景 此功能适用于Hive组件。 开启此功能后,在执行写目录:insert overwrite directory “/path1/path2/path3” ...时,其中“/path1/path2”目录权限为700且属主为当前用户,“path3”目录不存在,会自动创建“path3”目录,并写数据成功。 上述功能,在Hive参数“hive.server2.enable.doAs”为“true”时已经支持,本次增加当“hive.server2.enable.doAs”为“false”时的功能支持。 本功能参数调整与配置Hive目录旧数据自动移除至回收站添加的自定义参数相同。
  • Pipeline 适用于 MRS 3.x及之后版本。 表13 Pipeline参数说明 参数 描述 默认值 是否必选 nettyconnector.registerserver.topic.storage 设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径。建议用户使用ZooKeeper进行存储。 /flink/nettyconnector 否,当使用pipeline特性为必选 nettyconnector.sinkserver.port.range 设置NettySink的端口范围。 28444-28843 否,当使用pipeline特性为必选 nettyconnector.ssl.enabled 设置NettySink与NettySource之间通信是否配置SSL加密。其中加密密钥以及加密协议等请参见SSL。 false 否,当使用pipeline特性为必选 nettyconnector.message.delimiter 用来配置nettysink发送给nettysource消息的分隔符,长度为2-4个字节,不可包含“\n”, “ ”, “#” 。 默认使用“$_” 否,当使用pipeline特性为必选
  • Kerberos-based Security 表9 Kerberos-based Security参数说明 参数 描述 默认值 是否必选 security.kerberos.login.keytab 该参数为客户端参数,keytab路径。 根据实际业务配置 是 security.kerberos.login.principal 该参数为客户端参数,如果keytab和principal都设置,默认会使用keytab认证。 根据实际业务配置 否 security.kerberos.login.contexts 该参数为服务器端参数,flink生成jass文件的contexts。 Client、KafkaClient 是
  • HA 表10 HA参数说明 参数 描述 默认值 是否必选 high-availability HA模式,是启用HA还是非HA模式。当前支持两种模式: none,只运行单个jobManager,jobManager的状态不进行Checkpoint。 ZooKeeper。 非YARN模式下,支持多个jobManager,通过选举产生leader。 YARN模式下只存在一个jobManager。 zookeeper 否 high-availability.zookeeper.quorum ZooKeeper quorum地址。 自动配置 否 high-availability.zookeeper.path.root Flink在ZooKeeper上创建的根目录,存放HA模式必须的元数据。 /flink 否 high-availability.storageDir 存放state backend中JobManager元数据,ZooKeeper只保存实际数据的指针。 hdfs:///flink/recovery 否 high-availability.zookeeper.client.session-timeout ZooKeeper客户端会话超时时间。单位:ms。 60000 否 high-availability.zookeeper.client.connection-timeout ZooKeeper客户端连接超时时间。单位:ms。 15000 否 high-availability.zookeeper.client.retry-wait ZooKeeper客户端重试等待时间。单位:ms。 5000 否 high-availability.zookeeper.client.max-retry-attempts ZooKeeper客户端最大重试次数。 3 否 high-availability.job.delay 当jobManager恢复后重启job的延迟时间。 仅适用于MRS 3.x及之后版本。 默认值和akka.ask.timeout配置值保持一致 否 high-availability.zookeeper.client.acl 设置ZooKeeper节点的ACL (open creator),按照集群的安全模式自动配置。设置ACL选项请参考:https://zookeeper.apache.org/doc/r3.5.1-alpha/zookeeperProgrammers.html#sc_BuiltinACLSchemes。 安全模式:creator 非安全模式:open 是 zookeeper.sasl.disable 基于SASL认证的使能开关,按照集群的安全模式自动配置:。 安全模式:false 非安全模式:true 是 zookeeper.sasl.service-name 如果ZooKeeper服务端配置了不同于“ZooKeeper”的服务名,可以设置此配置项。 如果客户端和服务端的服务名不一致,认证会失败。 zookeeper 是
  • File Systems 表7 File Systems参数说明 参数 描述 默认值 是否必选 fs.overwrite-files 文件输出写操作是否默认覆盖已有文件。 false 否 fs.output.always-create-directory 当文件写入程序的并行度大于1时,输出文件的路径下会创建一个目录,并将不同的结果文件(每个并行写程序任务)放入该目录。 设置为true,那么并行度为1的写入程序也将创建一个目录并将一个结果文件放入其中。 设置为false,则并行度为1的写入程序将直接在输出路径中创建文件,而不再创建目录。 false 否
  • State Backend 表8 State Backend参数说明 参数 描述 默认值 是否必选 state.backend.fs.checkpointdir 当backend为filesystem时的路径,路径必须能够被JobManager访问到,本地路径只支持local模式,集群模式下请使用HDFS路径。 hdfs:///flink/checkpoints 否 state.savepoints.dir Flink用于恢复和更新作业的保存点存储目录。当触发保存点的时候,保存点元数据信息将会保存到该目录中。 hdfs:///flink/savepoint 安全模式下必配 restart-strategy 默认重启策略,用于未指定重启策略的作业: fixed-delay failure-rate none none 否 restart-strategy.fixed-delay.attempts fixed-delay策略重试次数。 作业中开启了checkpoint,默认值为Integer.MAX_VALUE。 作业中未开启checkpoint,默认值为3。 否 restart-strategy.fixed-delay.delay fixed-delay策略重试间隔时间。单位:ms/s/m/h/d。 作业中开启了checkpoint,默认值是10 s。 作业中未开启checkpoint,默认值和配置项akka.ask.timeout的值一致。 否 restart-strategy.failure-rate.max-failures-per-interval 故障率策略下作业失败前给定时间段内的最大重启次数。 1 否 restart-strategy.failure-rate.failure-rate-interval failure-rate策略重试时间。单位:ms/s/m/h/d。 60 s 否 restart-strategy.failure-rate.delay failure-rate策略重试间隔时间。单位:ms/s/m/h/d。 默认值和akka.ask.timeout配置值一样。可参考Distributed Coordination (via Akka)。 否
  • Yarn 表12 Yarn参数说明 参数 描述 默认值 是否必选 yarn.maximum-failed-containers 当TaskManager所属容器出错后,重新申请container次数。默认值为Flink集群启动时TaskManager的数量。 5 否 yarn.application-attempts Application master重启次数,次数是算在一个validity interval的最大次数,validity interval在flink中设置为akka的timeout。重启后AM的地址和端口会变化,client需要手动连接。 2 否 yarn.heartbeat-delay Application Master和YARN Resource Manager心跳的时间间隔。单位:seconds 5 否 yarn.containers.vcores 每个Yarn容器的虚拟核数。 TaskManager的slot数 否 yarn.application-master.port Application Master端口号设置,支持端口范围。 32586-32650 否
  • Network communication (via Netty) 表5 Network communication参数说明 参数 描述 默认值 是否必选 taskmanager.network.netty.num-arenas Netty内存块数。 1 否 taskmanager.network.netty.server.numThreads Netty服务器线程的数量。 1 否 taskmanager.network.netty.client.numThreads Netty客户端线程数。 1 否 taskmanager.network.netty.client.connectTimeoutSec Netty客户端连接超时。单位:s。 120 否 taskmanager.network.netty.sendReceiveBufferSize Netty发送和接收缓冲区大小。 默认为系统缓冲区大小(cat / proc / sys / net / ipv4 / tcp_ [rw] mem),在现代Linux中为4MB。单位:bytes。 4096 否 taskmanager.network.netty.transport Netty传输类型,“nio”或“epoll”。 nio 否
  • SSL 表4 SSL参数说明 参数 描述 默认值 是否必选 备注 security.ssl.internal.enabled 内部通信SSL总开关,按照集群的安全模式自动配置。 安全模式:true 普通模式:false 是 仅MRS 3.x之前版本 security.ssl.internal.keystore Java keystore文件。 - 是 security.ssl.internal.keystore-password keystore文件解密密码。 - 是 security.ssl.internal.key-password keystore文件中服务端key的解密密码。 - 是 security.ssl.internal.truststore truststore文件包含公共CA证书。 - 是 security.ssl.internal.truststore-password truststore文件解密密码。 - 是 security.ssl.rest.enabled 外部通信SSL总开关,按照集群的安全模式自动配置。 安全模式:true 普通模式:false 是 security.ssl.rest.keystore Java keystore文件。 - 是 security.ssl.rest.keystore-password keystore文件解密密码。 - 是 security.ssl.rest.key-password keystore文件中服务端key的解密密码。 - 是 security.ssl.rest.truststore truststore文件包含公共CA证书。 - 是 security.ssl.rest.truststore-password truststore文件解密密码。 - 是 security.ssl.protocol SSL传输的协议版本。 TLSv1.2 是 适用于所有版本 security.ssl.algorithms 支持的SSL标准算法,具体可参考java官网:http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites。 TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 是 security.ssl.enabled 内部通信SSL总开关,按照集群的安装模式自动配置。 安全模式:true 普通模式:false 是 仅MRS 3.x及之后版本 security.ssl.keystore Java keystore文件。 - 是 security.ssl.keystore-password keystore文件解密密码。 - 是 security.ssl.key-password keystore文件中服务端key的解密密码。 - 是 security.ssl.truststore truststore文件包含公共CA证书。 - 是 security.ssl.truststore-password truststore文件解密密码。 - 是
  • JobManager Web Frontend 表6 JobManager Web Frontend参数说明 参数 描述 默认值 是否必选 备注 jobmanager.web.port web端口,支持范围:32261-32325。 32261-32325 否 仅MRS 3.x之前版本 jobmanager.web.allow-access-address web访问白名单,ip以逗号隔开。只有在白名单中的ip才能访问web。 * 是 适用于所有版本 flink.security.enable 用户安装Flink集群时,需要选择“安全模式”或“普通模式”。 当选择“安全模式”,自动配置为“true”。 当选择“普通模式”,自动配置为“false”。 对于已经安装好的Flink集群,用户可以通过查看配置的值来区分当前安装的是安全模式还是普通模式。 自动配置 否 仅MRS 3.x及之后版本 rest.bind-port web端口,支持范围:32261-32325。 32261-32325 否 jobmanager.web.history 显示“flink.security.enable”最近的job数目。 5 否 jobmanager.web.checkpoints.disable 禁用checkpoint统计。 false 否 jobmanager.web.checkpoints.history Checkpoint统计记录数。 10 否 jobmanager.web.backpressure.cleanup-interval 未访问反压记录清理周期。单位:ms。 600000 否 jobmanager.web.backpressure.refresh-interval 反压记录刷新周期。单位:ms。 60000 否 jobmanager.web.backpressure.num-samples 计算反压使用的堆栈跟踪记录数。 100 否 jobmanager.web.backpressure.delay-between-samples 计算反压的采样间隔。单位:ms 50 否 jobmanager.web.ssl.enabled web是否使用SSL加密传输,仅在全局开关security.ssl开启时有。 false 是 jobmanager.web.accesslog.enable web操作日志使能开关,日志会存放在webaccess.log中。 true 是 jobmanager.web.x-frame-options http安全头X-Frame-Options的值,可选范围为:SAMEORIGIN、DENY、ALLOW-FROM uri。 DENY 是 jobmanager.web.cache-directive web页面是否支持缓存。 no-store:所有内容都不会被保存到缓存 是 jobmanager.web.expires-time web页面缓存过期时长。单位:ms。 0 是 jobmanager.web.access-control-allow-origin 网页同源策略,防止跨域攻击。*表示允许任意网站跨域访问该服务端口,可配置为指定网址。 *(非安全集群) 是 jobmanager.web.refresh-interval web网页刷新时间。单位:ms。 3000 是 jobmanager.web.logout-timer 配置无操作情况下自动登出时间间隔。单位:ms。 600000 是 jobmanager.web.403-redirect-url web403页面,访问若遇到403错误,则会重定向到配置的页面。 自动配置 是 jobmanager.web.404-redirect-url web404页面,访问若遇到404错误,则会重定向到配置的页面。 自动配置 是 jobmanager.web.415-redirect-url web415页面,访问若遇到415错误,则会重定向到配置的页面。 自动配置 是 jobmanager.web.500-redirect-url web500页面,访问若遇到500错误,则会重定向到配置的页面。 自动配置 是 rest.await-leader-timeout 客户端等待Leader地址的时间(以ms为单位)。 30000 否 rest.client.max-content-length 客户端处理的最大内容长度(以字节为单位)。 104857600 否 rest.connection-timeout 客户端建立TCP连接的最长时间(以ms为单位)。 15000 否 rest.idleness-timeout 连接保持空闲状态的最长时间(以ms为单位)。 300000 否 rest.retry.delay 客户端在连续重试之间等待的时间(以ms为单位)。 3000 否 rest.retry.max-attempts 如果可重试算子操作失败,客户端将尝试重试的次数。 20 否 rest.server.max-content-length 服务端处理的最大内容长度(以字节为单位)。 104857600 否 rest.server.numThreads 异步处理请求的最大线程数。 4 否 web.timeout web监控超时时间(以ms为单位)。 10000 否
  • 配置详情 本章节介绍如下参数配置: JobManager & TaskManager: JobManager和TaskManager是Flink的主要组件,针对各种安全场景和性能场景,配置项包括通信端口,内存管理,连接重试等。 Blob服务端: JobManager节点上的Blob服务端是用于接收用户在客户端上传的Jar包,或将Jar包发送给TaskManager,传输log文件等,配置项包括端口,SSL,重试次数,并发等。 Distributed Coordination (via Akka): Flink客户端与JobManager的通信,JobManager与TaskManager的通信和TaskManager与TaskManager的通信都基于Akka actor模型。相关参数可以根据网络环境或调优策略进行配置,配置项包括消息发送和等待的超时设置,以及Akka DeathWatch检测机制参数等。 SSL: 当需要配置安全Flink集群时,需要配置SSL相关配置项,配置项包括SSL开关,证书,密码,加密算法等。 Network communication (via Netty): Flink运行Job时,Task之间的数据传输和反压检测都依赖Netty,某些环境下可能需要对Netty参数进行配置。对于高级调优,可调整部分Netty配置项,默认配置已可满足大规模集群并发高吞吐量的任务。 JobManager Web Frontend: JobManager启动时,会在同一进程内启动Web服务器,访问Web服务器可以获取当前Flink集群的信息,包括JobManager,TaskManager及集群内运行的Job。Web服务器参数的配置项包括端口,临时目录,显示项目,错误重定向,安全相关等。 File Systems: Task运行中会创建结果文件,支持对文件创建行为进行配置,配置项包括文件覆盖策略,目录创建等。 State Backend: Flink提供了HA和作业的异常恢复,并且提供版本升级时作业的暂停恢复。对于作业状态的存储,Flink依赖于state backend,作业的重启依赖于重启策略,用户可以对这两部分进行配置。配置项包括state backend类型,存储路径,重启策略等。 Kerberos-based Security: Flink安全模式下必须配置Kerberos相关配置项,配置项包括kerberos的keytab、principal等。 HA: Flink的HA模式依赖于ZooKeeper,所以必须配置ZooKeeper相关配置,配置项包括ZooKeeper地址,路径,安全认证等。 Environment: 对于JVM配置有特定要求的场景,可以通过配置项传递JVM参数到客户端,JobMananger,TaskManager等。 Yarn: Flink运行在Yarn集群上时,JobManager运行在Application Master上。JobManager的一些配置参数依赖于Yarn,通过配置YARN相关的配置,使Flink更好的运行在Yarn上,配置项包括yarn container的内存,虚拟内核,端口等。 Pipeline: 为适应某些场景对降低时延的需求,设计多个Job间采用Netty直接相连的方式传递数据,即分别使用NettySink用于Server端、NettySource用于Client端进行数据传输。配置项包括NettySink的信息存放路径、NettySink的端口监测范围、连接是否通过SSL加密以及NettySink监测所使用的网络所在域等。
  • JobManager & TaskManager 表1 JobManager & TaskManager参数说明 参数 描述 默认值 是否必选 备注 taskmanager.memory.size TaskManager在JVM堆内存中保留空间的大小,此内存用于排序,哈希表和中间状态的缓存。如果未指定,则会使用JVM堆内存乘以比例taskmanager.memory.fraction。单位:MB。 0 否 仅MRS 3.x之前版本 taskmanager.registration.initial-backoff 两次连续注册的初始间隔时间。单位:ms/s/m/h/d。 时间数值和单位之间有半角字符空格。ms/s/m/h/d表示毫秒、秒、分钟、小时、天。 500 ms 否 taskmanager.registration.refused-backoff JobManager拒绝注册后到允许再次注册的间隔时间。 5 min 否 taskmanager.rpc.port TaskManager的IPC端口范围。 32326-32390 否 适用于所有版本 taskmanager.memory.segment-size 内存管理器和网络堆栈使用的内存缓冲区大小。单位:bytes。 32768 否 taskmanager.data.port TaskManager数据交换端口范围。 32391-32455 否 taskmanager.data.ssl.enabled TaskManager之间数据传输是否使用SSL加密,仅在全局开关security.ssl开启时有效。 false 否 taskmanager.numberOfTaskSlots TaskManager占用的slot数,一般配置成物理机的核数,yarn-session模式下只能使用-s参数传递,yarn-cluster模式下只能使用-ys参数传递。 1 否 parallelism.default 默认并行度,用于未指定并行度的作业。 1 否 taskmanager.memory.fraction TaskManager在JVM堆内存中保留空间的比例,此内存用于排序,哈希表和中间状态的缓存。 0.7 否 taskmanager.memory.off-heap TaskManager是否使用堆外内存,此内存用于排序,哈希表和中间状态的缓存。建议对于大内存,开启此配置提高内存操作的效率。 false 是 taskmanager.memory.preallocate TaskManager是否在启动时分配保留内存空间。当开启堆外内存时,建议开启此配置项。 false 否 task.cancellation.interval 两次连续任务取消操作的间隔时间。单位:ms。 30000 否 client.rpc.port Flink client端Akka system监测端口。 32651-32720 否 仅MRS 3.x及之后版本 jobmanager.heap.size JobManager堆内存大小,yarn-session模式下只能使用-jm参数传递,yarn-cluster模式下只能使用-yjm参数传递,如果小于YARN配置文件中yarn.scheduler.minimum-allocation-mb大小,则使用YARN配置中的值。单位:B/KB/MB/GB/TB。 1024mb 否 taskmanager.heap.size TaskManager堆内存大小,yarn-session模式下只能使用-tm参数传递,yarn-cluster模式下只能使用-ytm参数传递,如果小于YARN配置文件中yarn.scheduler.minimum-allocation-mb大小,则使用YARN配置中的值。单位:B/KB/MB/GB/TB。 1024mb 否 taskmanager.network.numberOfBuffers TaskManager网络传输缓冲栈数量,如果作业运行中出错提示系统中可用缓冲不足,可以增加这个配置项的值。 2048 否 taskmanager.debug.memory.startLogThread 调试Flink内存和GC相关问题时可开启,TaskManager会定时采集内存和GC的统计信息,包括当前堆内,堆外,内存池的使用率和GC时间。 false 否 taskmanager.debug.memory.logIntervalMs TaskManager定时采集内存和GC的统计信息的采集间隔。 0 否 taskmanager.maxRegistrationDuration TaskManager向JobManager注册自己的最长时间,如果超过时间,TaskManager会关闭。 5 min 否 taskmanager.initial-registration-pause 两次连续注册的初始间隔时间。该值需带一个时间单位(ms/s/min/h/d)(比如5秒)。 时间数值和单位之间有半角字符空格。ms/s/m/h/d表示毫秒、秒、分钟、小时、天。 500 ms 否 taskmanager.max-registration-pause TaskManager注册失败最大重试间隔。单位:ms/s/m/h/d。 30 s 否 taskmanager.refused-registration-pause TaskManager注册连接被JobManager拒绝后的重试间隔。单位:ms/s/m/h/d。 10 s 否 classloader.resolve-order 从用户代码加载类时定义类解析策略,这意味着是首先检查用户代码jar(“child-first”)还是应用程序类路径(“parent-first”)。默认设置指示首先从用户代码jar加载类,这意味着用户代码jar可以包含和加载不同于Flink使用的(依赖)依赖项。 child-first 否 slot.idle.timeout Slot Pool中空闲Slot的超时时间(以ms为单位)。 50000 否 slot.request.timeout 从Slot Pool请求Slot的超时(以ms为单位)。 300000 否 task.cancellation.timeout 取消任务超时时间(以ms为单位),超时后会触发TaskManager致命错误。设置为0,取消任务卡住则不会报错。 180000 否 taskmanager.network.detailed-metrics 启用网络队列长度的详细指标监控。 false 否 taskmanager.network.memory.buffers-per-channel 每个传出/传入通道(子分区/输入通道)使用的最大网络缓冲区数.在基于信用的流量控制模式下,这表示每个输入通道中有多少信用。它应配置至少2以获得良好的性能。1个缓冲区用于接收子分区中的飞行中数据,1个缓冲区用于并行序列化。 2 否 taskmanager.network.memory.floating-buffers-per-gate 每个输出/输入门(结果分区/输入门)使用的额外网络缓冲区数。在基于信用的流量控制模式中,这表示在所有输入通道之间共享多少浮动信用。浮动缓冲区基于积压(子分区中的实时输出缓冲区)反馈来分布,并且可以帮助减轻由子分区之间的不平衡数据分布引起的背压。如果节点之间的往返时间较长和/或群集中的机器数量较多,则应增加此值。 8 否 taskmanager.network.memory.fraction 用于网络缓冲区的JVM内存的占比。这决定了TaskManager可以同时拥有多少流数据交换通道以及通道缓冲的程度。如果作业被拒绝或者收到系统没有足够缓冲区的警告,请增加此值或“taskmanager.network.memory.min”和“taskmanager.network.memory.max”。另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此占比。 0.1 否 taskmanager.network.memory.max 网络缓冲区的最大内存大小。该值需带一个大小单位(B/KB/MB/GB/TB)。 1 GB 否 taskmanager.network.memory.min 网络缓冲区的最小内存大小。该值需带一个大小单位(B/KB/MB/GB/TB)。 64 MB 否 taskmanager.network.request-backoff.initial 输入通道的分区请求的最小退避(以ms为单位)。 100 否 taskmanager.network.request-backoff.max 输入通道的分区请求的最大退避(以ms为单位)。 10000 否 taskmanager.registration.timeout TaskManager注册的超时时间,在该时间内未成功注册,TaskManager将终止。该值需带一个时间单位(ms/s/min/h/d)。 5 min 否 resourcemanager.taskmanager-timeout 释放空闲TaskManager的超时(以ms为单位)。 30000 否
  • Blob服务端 表2 Blob服务端参数说明 参数 描述 默认值 是否必选 blob.server.port blob服务器端口。 32456-32520 否 blob.service.ssl.enabled blob传输通道是否加密传输,仅在全局开关security.ssl开启时有。 true 是 blob.fetch.retries TaskManager从JobManager下载blob文件的重试次数。 50 否 blob.fetch.num-concurrent JobManager支持的下载blob的并发数。 50 否 blob.fetch.backlog JobManager支持的blob下载队列大小,比如下载Jar包等。单位:个。 1000 否 library-cache-manager.cleanup.interval 当用户取消flink job后,jobmanager删除HDFS上存放用户jar包的时间,单位为s。 仅适用于MRS 3.x及之后版本。 3600 否
  • Distributed Coordination (via Akka) 表3 Distributed Coordination参数说明 参数 描述 默认值 是否必选 备注 akka.ask.timeout akka所有异步请求和阻塞请求的超时时间。如果Flink发生超时失败,可以增大这个值。当机器处理速度慢或者网络阻塞时会发生超时。单位:ms/s/m/h/d。 10s 否 适用于所有版本 akka.lookup.timeout 查找JobManager actor对象的超时时间。单位:ms/s/m/h/d。 10s 否 akka.framesize JobManager和TaskManager间最大消息传输大小。当Flink出现消息大小超过限制的错误时,可以增大这个值。单位:b/B/KB/MB。 10485760b 否 akka.watch.heartbeat.interval Akka DeathWatch机制检测失联TaskManager的心跳间隔。如果TaskManager经常发生由于心跳消息丢失或延误而被错误标记为失联的情况,可以增大这个值。单位:ms/s/m/h/d。 10s 否 akka.watch.heartbeat.pause Akka DeathWatch可接受的心跳暂停时间,较小的数值表示不允许不规律的心跳。单位:ms/s/m/h/d。 60s 否 akka.watch.threshold DeathWath失败检测阈值,较小的数值容易把正常TaskManager标记为失败,较大的值增加了失败检测的时间。 12 否 akka.tcp.timeout 发送连接TCP超时时间,如果经常发生满网络环境下连接TaskManager超时,可以增大这个值。单位:ms/s/m/h/d。 20s 否 akka.throughput Akka批量处理消息的数量,一次操作完后把处理线程归还线程池。较小的数值代表actor消息处理的公平调度,较大的值以牺牲调度公平的代价提高整体性能。 15 否 akka.log.lifecycle.events Akka远程时间日志开关,当需要调试时可打开此开关。 false 否 akka.startup-timeout 远程组件启动失败前的超时时间。该值需带一个时间单位(ms/s/min/h/d) 与akka.ask.timeout的值一致 否 akka.ssl.enabled Akka通信SSL开关,仅在全局开关security.ssl开启时有。 true 是 akka.client-socket-worker-pool.pool-size-factor 计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。 1.0 否 仅适用于MRS 3.x及之后版本 akka.client-socket-worker-pool.pool-size-max 基于因子计算的线程数上限。 2 否 akka.client-socket-worker-pool.pool-size-min 基于因子计算的线程数下限。 1 否 akka.client.timeout 【说明】客户端超时时间。该值需带一个时间单位(ms/s/min/h/d)。 60s 否 akka.server-socket-worker-pool.pool-size-factor 【说明】计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。 1.0 否 akka.server-socket-worker-pool.pool-size-max 基于因子计算的线程数上限。 2 否 akka.server-socket-worker-pool.pool-size-min 基于因子计算的线程数下限。 1 否
  • 配置说明 Flink所有的配置参数都可以在客户端侧进行配置,建议用户直接修改客户端的“flink-conf.yaml”配置文件进行配置,如果通过Manager界面修改Flink服务参数,配置完成之后需要重新下载安装客户端: 配置文件路径:客户端安装路径/Flink/flink/conf/flink-conf.yaml。 文件的配置格式为key: value。 例:taskmanager.heap.size: 1024mb 注意配置项key:与value之间需有空格分隔。
  • 配置描述 当需要MapReduce shuffle服务绑定特定IP时,需要在NodeManager实例所在节点的配置文件“mapred-site.xml”中(例如路径为:${BIGDATA_HOME}/ FusionInsight _HD_xxx/x_xx_NodeManager/etc/mapred-site.xml)设置如下参数。 表1 参数描述 参数 描述 默认值 mapreduce.shuffle.address 指定地址来运行shuffle服务,格式是IP:PORT,参数的默认值为空。当参数值为空时,将绑定localhost,默认端口为13562。 说明: 如果涉及到的PORT值和配置的mapreduce.shuffle.port值不一样时,mapreduce.shuffle.port将不会生效。 -
  • 配置描述 查看Yarn服务配置参数 参考修改集群服务配置参数进入Yarn服务参数“全部配置”界面,在搜索框中输入表1中参数名称。 表1 参数描述 参数 描述 默认值 yarn.acl.enable Yarn权限控制启用开关。 true yarn.webapp.filter-entity-list-by-user 严格视图启用开关,开启后,登录用户只能查看该用户有权限查看的内容。当要开启该功能时,同时需要设置参数“yarn.acl.enable”为true。 说明: 此参数适用于MRS 3.x及后续版本集群。 true 查看 MapReduce服务 配置参数 参考修改集群服务配置参数进入MapReduce服务参数“全部配置”界面,在搜索框中输入表2中参数名称。 表2 参数描述 参数 描述 默认值 mapreduce.cluster.acls.enabled MR JobHistoryServer权限控制启用开关。该参数为客户端参数,当JobHistoryServer服务端开启权限控制之后该参数生效。 true yarn.webapp.filter-entity-list-by-user MR JobHistoryServer严格视图启用开关,开启后,登录用户只能查看该用户有权限查看的内容。该参数为JobHistoryServer的服务端参数,表示JHS开启了权限控制,但是否要对某一个特定的Application进行控制,是由客户端参数:“mapreduce.cluster.acls.enabled”决定。 说明: 此参数适用于MRS 3.x及后续版本集群。 true 以上配置会影响restful API和shell命令结果,即以上配置开启后,restful API调用和shell命令运行所返回的内容只包含调用用户有权查看的信息。 当“yarn.acl.enable”或“mapreduce.cluster.acls.enabled”设置为“false”时,即关闭Yarn或MapReduce的权限校验功能。此时任何用户都可以在Yarn或MapReduce上提交任务和查看任务信息,存在安全风险,请谨慎使用。
  • 创建Ranger集群 参考购买自定义集群创建集群,组件选择时勾选Ranger组件。 目前MRS 1.9.2集群仅普通模式集群支持Ranger组件,开启Kerberos认证的安全集群不支持Ranger组件。 图1 选择Ranger组件 选择是否开启“使用外部数据源存储元数据”功能。 开启:使用外置的MySQL数据库存储Ranger组件的User/Group/Policy等数据。 关闭:Ranger组件的User/Group/Policy等数据默认存放在当前集群本地数据库中。 当“使用外部数据源存储元数据”开启时,选择数据连接类型为“RDS服务MySQL数据库”,数据连接实例选择已创建的数据连接实例,或单击“创建数据连接”新创建一个数据连接。 图2 使用RDS服务MySQL数据库 当用户选择的数据连接为“RDS服务MySQL数据库”时,请确保使用的数据库用户为root用户。如果为非root用户,需要先以root用户登录到数据库执行如下SQL命令为该数据库用户进行赋权,其中${db_name}与${db_user}为用户新建数据连接时输入的数据库名与用户名。 grant select on mysql.user to ${db_user};grant all privileges on ${db_name}.* to '${db_user}'@'%' with grant option;grant reload on *.* to '${db_user}'@'%' with grant option;flush privileges; 继续参考购买自定义集群配置其他参数并创建集群。 在集群创建完成后,此时Ranger不会对用户访问Hive和HBase组件的权限进行控制。 使用Ranger管理各组件权限时,如管理hive表权限,在管理控制台或者客户端提交hive作业(操作hive数据表),可能会提示当前用户没有权限,需要在Ranger中给提交作业的用户配置具体数据库或者表权限,以免影响用户使用提交作业功能,具体请参考在Ranger中配置Hive/Impala的访问权限或在Ranger中配置HBase的访问权限页面的添加策略步骤。 父主题: 使用Ranger(MRS 1.9.2)
  • 配置描述 参考修改集群服务配置参数进入Yarn服务参数“全部配置”界面,在搜索框中输入参数名称。 根据表1,对如下参数进行设置。 表1 AM作业保留相关参数 参数 说明 默认值 yarn.app.mapreduce.am.work-preserve 是否开启AM作业保留特性。 false yarn.app.mapreduce.am.umbilical.max.retries AM作业保留特性中,运行的容器尝试恢复的最大次数。 5 yarn.app.mapreduce.am.umbilical.retry.interval AM作业保留特性中,运行的容器尝试恢复的时间间隔。单位:毫秒。 10000 yarn.resourcemanager.am.max-attempts ApplicationMaster的重试次数。增加重试次数可以避免当资源不足时造成AM启动失败。 适用于所有ApplicationMaster的全局设置。每个ApplicationMaster都可以使用API设置一个单独的最大尝试次数,但这个次数不能大于全局的最大次数。如果大于了,那ResourceManager将会覆写这个单独的最大尝试次数。取值范围大于等于1。 2
  • 配置场景 在YARN中,ApplicationMaster(AM)与Container类似,都运行在NodeManager(NM)上(本文中忽略未管理的AM)。AM可能由于多种原因崩溃、退出或关闭。如果AM停止运行,ResourceManager(RM)会关闭ApplicationAttempt中管理的所有Container,其中包括当前在NM上运行的所有Container。RM会在另一计算节点上启动新的ApplicationAttempt。 对于不同类型的应用,希望以不同方式处理AM重启的事件。MapReduce类应用的目标是不丢失任务,但允许丢失当前运行的Container。但是对于长周期的YARN服务而言,用户可能并不希望由于AM的故障而导致整个服务停止运行。 YARN支持在新的ApplicationAttempt启动时,保留之前Container的状态,因此运行中的作业可以继续无故障的运行。 图1 AM作业保留
  • 配置场景 本章节操作适用于MRS 3.x及之后版本。 分布式缓存在两种情况下非常有用。 滚动升级 在升级过程中,应用程序必须保持文字内容(jar文件或配置文件)不变。而这些内容并非基于当前版本的Yarn,而是要基于其提交时的版本。一般情况下,应用程序(例如MapReduce、Hive、Tez等)需要进行完整的本地安装,将库安装至所有的集群机器(客户端及服务器端机器)中。当集群内开始进行滚动升级或降级时,本地安装的库的版本必然会在应用运行过程时发生改变。在滚动升级过程中,首先只会对少数NodeManager进行升级,这些NodeManager会获得新版本的软件。这导致了行为的不一致,并可能发生运行时错误。 同时存在多个Yarn版本 集群管理员可能会在一个集群内运行使用多个版本Yarn及Hadoop jars的任务。这在当前很难实现,因为jars已被本地化且只有一个版本。 MapReduce应用框架可以通过分布式缓存进行部署,且无需依赖安装中复制的静态版本。因此,可以在HDFS中存放多版本的Hadoop,并通过配置“mapred-site.xml”文件指定任务默认使用的版本。只需设置适当的配置属性,用户就可以运行不同版本的MapReduce,而无需使用部署在集群中的版本。 图1 具有多个版本NodeManagers及Applications的集群 在图1中:可以看出,应用程序可以使用HDFS中的Hadoop jars,而无需使用本地版本。因此在滚动升级中,即使NodeManager已经升级,应用程序仍然可以运行旧版本的Hadoop。
  • 操作步骤 以下参数有如下两个配置入口: 服务器端配置 进入Yarn服务参数“全部配置”界面,在搜索框中输入参数名称。具体操作请参考修改集群服务配置参数章节。 客户端配置 直接在客户端中修改相应的配置文件。 HDFS客户端配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/hdfs-site.xml。 Yarn客户端配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/yarn-site.xml。 MapReduce客户端配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/mapred-site.xml。 表1 多CPU内核设置 配置 参数 配置描述 节点容器槽位数 yarn.nodemanager.resource.memory-mb 参数解释:节点上YARN可使用的物理内存总量。单位:M。 默认值: MRS 3.x之前版本: 8192 MRS 3.x及之后版本: 16384 参数入口: MRS 3.x之前版本:需要在MRS控制台上进行配置。 MRS 3.x及之后版本:需要在FusionInsight Manager系统进行配置。 参数配置组合决定了每节点任务(map、reduce)的并发数。 如果所有的任务(map/reduce)需要读写数据至磁盘,多个进程将会同时访问一个磁盘。这将会导致磁盘的IO性能非常低下。为了改善磁盘的性能,请确保客户端并发访问磁盘的数不大于3。 最大并发的container数量应该为[2.5 * Hadoop中磁盘配置数 ]。 mapreduce.map.memory.mb 参数解释:map任务的内存限制。单位:MB。 默认值:4096 参数入口:需要在客户端进行配置,配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/mapred-site.xml。 mapreduce.reduce.memory.mb 参数解释:Reduce任务的内存限制。单位:MB。 默认值:4096 参数入口:需要在客户端进行配置,配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/mapred-site.xml。 Map输出与压缩 mapreduce.map.output.compress 参数解释:指定了Map任务输出结果可以在网络传输前被压缩。这是一个per-job的配置。 默认值:true 参数入口:需要在客户端进行配置,配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/mapred-site.xml。 Map任务所产生的输出可以在写入磁盘之前被压缩,这样可以节约磁盘空间并得到更快的写盘速度,同时可以减少至Reducer的数据传输量。需要在客户端进行配置。 在这种情况下,磁盘的IO是主要瓶颈。所以可以选择一种压缩率非常高的压缩算法。 编解码器可配置为Snappy,Benchmark测试结果显示Snappy是非常平衡以及高效的编码器。 mapreduce.map.output.compress.codec 参数解释:指定用于压缩的编解码器。 默认值:org.apache.hadoop.io.compress.Lz4Codec 参数入口:需要在客户端进行配置,配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/mapred-site.xml。 Spills mapreduce.map.sort.spill.percent 参数解释:序列化缓冲区中的软限制。一旦达到该限制,线程将在后台开始将内容溢出到磁盘。 默认值:0.8 参数入口:需要在客户端进行配置,配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/mapred-site.xml。 磁盘IO是主要瓶颈,合理配置“mapreduce.task.io.sort.mb”可以使溢出至磁盘的内容最小化。 数据包大小 dfs.client-write-packet-size 参数解释:配置项可以指定该数据包的大小。可以通过每个job进行指定。 默认值:262144 参数入口:需要在客户端进行配置,配置文件路径:客户端安装目录/HDFS/hadoop/etc/hadoop/hdfs-site.xml。 当HDFS客户端写数据至数据节点时,数据会被累积,直到形成一个包。这个数据包会通过网络传输。 数据节点从HDFS客户端接收数据包,然后将数据包里的数据单线程写入磁盘。当磁盘处于并发写入状态时,增加数据包的大小可以减少磁盘寻道时间,从而提升IO性能。 dfs.client-write-packet-size = 262144
  • 使用客户端 如果当前集群已启用Kerberos认证,登录MRS Manager页面,创建属于“opentsdb,hbase,opentsdbgroup和supergroup”组且拥有HBase权限的用户,例如创建用户为opentsdbuser,具体请参考准备开发用户。如果当前集群未启用Kerberos认证,则无需执行此步骤。 根据业务情况,准备好客户端,并登录安装客户端的节点。 例如在Master2节点更新客户端,则登录该节点使用客户端,具体参见更新客户端(3.x之前版本)。 执行以下命令切换用户。 sudo su - omm 执行以下命令,切换到客户端目录,例如“/opt/client”。 cd /opt/client 执行以下命令,配置环境变量。 source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。 当用户为“人机”用户时:执行kinit opentsdbuser认证用户 当用户为“机机”用户时:下载用户认证凭据文件,保存并解压获取用户的user.keytab文件与krb5.conf文件,进入解压后的user.keytab目录下,执行kinit -kt user.keytab opentsdbuser认证用户 操作Opentsdb数据,具体请参见操作数据。
  • Kafka UI Kafka UI提供Kafka Web服务,通过界面展示Kafka集群中Broker、Topic、Partition、Consumer等功能模块的基本信息,同时提供Kafka服务常用命令的界面操作入口。该功能作为Kafka Manager替代,提供符合安全规范的Kafka Web服务。 通过Kafka UI可以进行以下操作: 支持界面检查集群状态(主题,消费者,偏移量,分区,副本,节点) 支持界面执行集群内分区重新分配 支持界面选择配置创建主题 支持界面删除主题(Kafka服务设置了参数“delete.topic.enable = true”) 支持为已有主题增加分区 支持更新现有主题的配置 可以为分区级别和主题级别度量标准启用JMX查询
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全