华为云用户手册

  • compaction&cleaning配置 参数 描述 默认值 hoodie.clean.automatic 是否执行自动clean。 true hoodie.cleaner.policy 要使用的清理策略。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。需要确保数据保留的时间超过最大查询执行时间。 KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained 保留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的),这也直接转化为逐步提取此数据集的数量。 10 hoodie.keep.max.commits 触发归档操作的commit数阈值 30 hoodie.keep.min.commits 归档操作保留的commit数。 20 hoodie.commits.archival.batch 这控制着批量读取并一起归档的提交即时的数量。 10 hoodie.parquet.small.file.limit 该值应小于maxFileSize,如果将其设置为0,会关闭此功能。由于批处理中分区中插入记录的数量众多,总会出现小文件。Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。此处的大小是被视为“小文件大小”的最小文件大小。 104857600 byte hoodie.copyonwrite.insert.split.size 插入写入并行度。为单个分区的总共插入次数。写出100MB的文件,至少1KB大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)。 500000 hoodie.copyonwrite.insert.auto.split Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize,默认关闭。 true hoodie.copyonwrite.record.size.estimate 平均记录大小。如果指定,Hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。 1024 hoodie.compact.inline 当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩。 true hoodie.compact.inline.max.delta.commits 触发内联压缩之前要保留的最大增量提交数。 5 hoodie.compaction.lazy.block.read 当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)。 true hoodie.compaction.reverse.log.read HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件。 false hoodie.cleaner.parallelism 如果清理变慢,请增加此值。 200 hoodie.compaction.strategy 用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。默认情况下,Hudi选择具有累积最多未合并数据的日志文件。 org.apache.hudi.table.action.compact.strategy. LogFileSizeBasedCompactionStrategy hoodie.compaction.target.io LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。 500 * 1024 MB hoodie.compaction.daybased.target.partitions 由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。 10 hoodie.compaction.payload.class 这需要与插入/插入更新过程中使用的类相同。就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。 org.apache.hudi.common.model.Defaulthoodierecordpayload hoodie.schedule.compact.only.inline 在写入操作时,是否只生成压缩计划。在hoodie.compact.inline=true时有效。 false hoodie.run.compact.only.inline 通过Sql执行run compaction命令时,是否只执行压缩操作,压缩计划不存在时直接退出。 false 父主题: Hudi常见配置参数
  • 操作场景 Hive支持对表的某一列或者多列进行加密;在创建Hive表时,可以指定要加密的列和加密算法。当使用insert语句向表中插入数据时,即可实现将对应列加密。列加密只支持存储在HDFS上的TextFile和SequenceFile文件格式的表。Hive列加密不支持视图以及Hive over HBase场景。 Hive列加密机制目前支持的加密算法有两种,在建表时指定: AES(对应加密类名称为:org.apache.hadoop.hive.serde2.AESRewriter) SMS 4(对应加密类名称为:org.apache.hadoop.hive.serde2.SMS4Rewriter) 将原始数据从普通Hive表导入到Hive列加密表后,在不影响其他业务情况下,建议删除普通Hive表上原始数据,因为保留一张未加密的表存在安全风险。
  • 日志级别 Flume提供了如表2所示的日志级别。 运行日志的级别优先级从高到低分别是FATAL、ERROR、WARN、INFO、DEBUG,程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表2 日志级别 日志类型 级别 描述 运行日志 FATAL FATAL表示系统运行的致命错误信息。 ERROR ERROR表示系统运行的错误信息。 WARN WARN表示当前事件处理存在异常信息。 INFO INFO表示记录系统及各事件正常运行状态信息。 DEBUG DEBUG表示记录系统及系统的调试信息。 如果您需要修改日志级别,请执行如下操作: 请参考修改集群服务配置参数,进入Flume的“全部配置”页面。 左边菜单栏中选择所需修改的角色所对应的日志菜单。 选择所需修改的日志级别。 保存配置,在弹出窗口中单击“确定”使配置生效。 配置完成后即生效,不需要重启服务。
  • 操作步骤 读数据服务端调优 参数入口: 进入HBase服务参数“全部配置”界面,具体操作请参考修改集群服务配置参数章节。 表1 影响实时读数据配置项 配置参数 描述 默认值 GC_OPTS HBase利用内存完成读写操作。提高HBase内存可以有效提高HBase性能。 GC_OPTS主要需要调整HeapSize的大小和NewSize的大小。调整HeapSize大小的时候,建议将Xms和Xmx设置成相同的值,这样可以避免JVM动态调整HeapSize大小的时候影响性能。调整NewSize大小的时候,建议把其设置为HeapSize大小的1/8。 HMaster:当HBase集群规模越大、Region数量越多时,可以适当调大HMaster的GC_OPTS参数。 RegionServer:RegionServer需要的内存一般比HMaster要大。在内存充足的情况下,HeapSize可以相对设置大一些。 说明: 主HMaster的HeapSize为4G的时候,HBase集群可以支持100000 region数的规模。根据经验值,集群每增加35000个region,HeapSize增加2G,主HMaster的HeapSize不建议超过32GB。 MRS 3.x之前版本: HMaster: -server -Xms2G -Xmx2G -XX:NewSize=256M -XX:MaxNewSize=256M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:MaxDirectMemorySize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M RegionServer: -server -Xms4G -Xmx4G -XX:NewSize=512M -XX:MaxNewSize=512M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:MaxDirectMemorySize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M MRS 3.x及之后版本: HMaster -server -Xms4G -Xmx4G -XX:NewSize=512M -XX:MaxNewSize=512M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M Region Server -server -Xms6G -Xmx6G -XX:NewSize=1024M -XX:MaxNewSize=1024M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M hbase.regionserver.handler.count 表示RegionServer在同一时刻能够并发处理多少请求。如果设置过高会导致激烈线程竞争,如果设置过小,请求将会在RegionServer长时间等待,降低处理能力。根据资源情况,适当增加处理线程数。 建议根据CPU的使用情况,可以选择设置为100至300之间的值。 200 hfile.block.cache.size HBase缓存区大小,主要影响查询性能。根据查询模式以及查询记录分布情况来决定缓存区的大小。如果采用随机查询使得缓存区的命中率较低,可以适当降低缓存区大小。 当offheap关闭时,默认值为0.25。当offheap开启时,默认值是0.1。 如果同时存在读和写的操作,这两种操作的性能会互相影响。如果写入导致的flush和Compaction操作频繁发生,会占用大量的磁盘IO操作,从而影响读取的性能。如果写入导致阻塞较多的Compaction操作,就会出现Region中存在多个HFile的情况,从而影响读取的性能。所以如果读取的性能不理想的时候,也要考虑写入的配置是否合理。 读数据客户端调优 Scan数据时需要设置caching(一次从服务端读取的记录条数,默认是1),若使用默认值读性能会降到极低。 当不需要读一条数据所有的列时,需要指定读取的列,以减少网络IO。 只读取RowKey时,可以为Scan添加一个只读取RowKey的filter(FirstKeyOnlyFilter或KeyOnlyFilter)。 读数据表设计调优 表2 影响实时读数据相关参数 配置参数 描述 默认值 COMPRESSION 配置数据的压缩算法,这里的压缩是HFile中block级别的压缩。对于可以压缩的数据,配置压缩算法可以有效减少磁盘的IO,从而达到提高性能的目的。 说明: 并非所有数据都可以进行有效压缩。例如一张图片的数据,因为图片一般已经是压缩后的数据,所以压缩效果有限。常用的压缩算法是SNAPPY,因为它有较好的Encoding/Decoding速度和可以接受的压缩率。 NONE BLOCKSIZE 配置HFile中block块的大小,不同的block块大小,可以影响HBase读写数据的效率。越大的block块,配合压缩算法,压缩的效率就越好;但是由于HBase的读取数据是以block块为单位的,所以越大的block块,对于随机读的情况,性能可能会比较差。 如果要提升写入的性能,一般扩大到128KB或者256KB,可以提升写数据的效率,也不会影响太大的随机读性能。单位:字节。 65536 DATA_BLOCK_ENCODING 配置HFile中block块的编码方法。当一行数据中存在多列时,一般可以配置为“FAST_DIFF”,可以有效的节省数据存储的空间,从而提供性能。 NONE
  • 工具使用 下载安装客户端,例如安装目录为“/opt/client”。进入 目录“/opt/client/Spark2x/spark/bin”, 执行start-prequery.sh。 参考表1,配置prequeryParams.properties。 表1 参数列表 参数 说明 示例 spark.prequery.period.max.minute 预热的最大时长,单位分钟 60 spark.prequery.tables 表名配置database.table:int,表名支持通配符*,int代表预热多长时间内有更新的表,单位为天。 default.test*:10 spark.prequery.maxThreads 预热时并发的最大线程数 50 spark.prequery.sslEnable 集群安全模式为true,非安全模式为false true spark.prequery.driver JD BCS erver的地址ip:port,如需要预热多个Server则需填写多个Server的IP,多个IP:port用逗号隔开。 192.168.0.2:22550 spark.prequery.sql 预热的sql语句,不同语句冒号隔开 SELECT COUNT(*) FROM %s;SELECT * FROM %s LIMIT 1 spark.security.url 安全模式下jdbc所需url ;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM; spark.prequery.sql 配置的语句在每个所预热的表中都会执行,表名用%s代替。 脚本使用 命令形式:sh start-prequery.sh 执行此条命令需要:将user.keytab或jaas.conf(二选一),krb5.conf(必须)放入conf目录中。 此工具暂时只支持Carbon表。 此工具会初始化Carbon环境和预读取表的元数据到JDB CS erver,所以更适合在多主实例、静态分配模式下使用。
  • 配置描述 在客户端的“mapred-site.xml”配置文件中调整如下参数。 “mapred-site.xml”配置文件在客户端安装路径的conf目录下,例如“/opt/client/Yarn/config”。 表1 参数说明 参数 描述 默认值 mapreduce.reduce.shuffle.max-host-failures MR任务在reduce过程中读取远端shuffle数据允许失败的次数。当设置次数大于5时,可以降低客户端应用的失败率。该参数适用于MRS 3.x版本。 5 mapreduce.client.submit.file.replication MR任务在运行时依赖的相关job文件在HDFS上的备份。当备份数大于10时,可以降低客户端应用的失败率。 10
  • 回答 在这种场景下,CarbonData会给每个节点分配一个INSERT INTO或LOAD DATA任务。如果Executor不是不同的节点分配的,CarbonData将会启动较少的task。 解决措施: 您可以适当增大Executor内存和Executor核数,以便YARN可以在每个节点上启动一个Executor。具体的配置方法如下: 配置Executor核数。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 前提条件 HDFS和Oozie组件安装完成且运行正常,客户端安装成功。 如果当前客户端为旧版本,需要重新下载和安装客户端。 已创建或获取访问Oozie服务的人机用户账号及密码。 该用户需要从属于hadoop、supergroup、hive组,同时添加Oozie的角色操作权限。若使用Hive多实例,该用户还需要从属于具体的Hive实例组,如hive3。 用户同时还需要至少有manager_viewer权限的角色。
  • 回答 正常情况下,相同rowkey值的数据加载到HBase是有先后顺序的,HBase以最近的时间戳的数据为最新数据,一般的默认查询中,没有指定时间戳的,就会对相同rowkey值的数据仅返回最新数据。 使用bulkload加载数据,由于数据在内存中处理生成HFile,速度是很快的,很可能出现相同rowkey值的数据具有相同时间戳,从而造成查询结果混乱的情况。 建议在建表和数据加载时,设计好rowkey值,尽量避免在同一个数据文件中存在相同rowkey值的情况。
  • 操作步骤 停止Flume角色的客户端。 假设Flume客户端安装路径为“/opt/FlumeClient”,执行以下命令,停止Flume客户端: cd /opt/FlumeClient/fusioninsight-flume-Flume组件版本号/bin ./flume-manage.sh stop 执行脚本后,显示如下信息,说明成功的停止了Flume客户端: Stop Flume PID=120689 successful.. Flume客户端停止后会自动重启,如果不需自动重启,请执行以下命令: ./flume-manage.sh stop force 需要启动时,可执行以下命令: ./flume-manage.sh start force 卸载Flume角色的客户端。 假设Flume客户端安装路径为“/opt/FlumeClient”,执行以下命令,卸载Flume客户端: cd /opt/FlumeClient/fusioninsight-flume-Flume组件版本号/inst ./uninstall.sh
  • 启动Maxwell 登录Maxwell所在的服务器。 执行如下命令进入Maxwell安装目录。 cd /opt/maxwell-1.21.0/ 如果是初次使用Maxwell,建议将conf/config.properties中的log_level改为debug(调试级别),以便观察启动之后是否能正常从MySQL获取数据并发送到kafka,当整个流程调试通过之后,再把log_level修改为info,然后先停止再启动Maxwell生效。 # log level [debug | info | warn | error] log_level=debug 执行如下命令启动Maxwell。 source /opt/client/bigdata_env bin/Maxwell bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ --producer=kafka --kafka.bootstrap.servers=kafkahost:9092 --kafka_topic=Maxwell 其中,user,password和host分别表示MySQL的用户名,密码和IP地址,这三个参数可以通过修改配置项配置也可以通过上述命令配置,kafkahost为流式集群的Core节点的IP地址。 命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。 显示类似如下信息,表示Maxwell启动成功。 Success to start Maxwell [78092].
  • Maxwell生成的数据格式及常见字段含义 Maxwell生成的数据格式为JSON,常见字段含义如下: type:操作类型,包含database-create,database-drop,table-create,table-drop,table-alter,insert,update,delete database:操作的数据库名称 ts:操作时间,13位时间戳 table:操作的表名 data:数据增加/删除/修改之后的内容 old:数据修改前的内容或者表修改前的结构定义 sql:DDL操作的SQL语句 def:表创建与表修改的结构定义 xid:事务唯一ID commit:数据增加/删除/修改操作是否已提交
  • 验证Maxwell 登录Maxwell所在的服务器。 查看日志。如果日志里面没有ERROR日志,且有打印如下日志,表示与MySQL连接正常。 BinlogConnectorLifecycleListener - Binlog connected. 登录MySQL数据库,对测试数据进行更新/创建/删除等操作。操作语句可以参考如下示例。 -- 创建库 create database test; -- 创建表 create table test.e ( id int(10) not null primary key auto_increment, m double, c timestamp(6), comment varchar(255) charset 'latin1' ); -- 增加记录 insert into test.e set m = 4.2341, c = now(3), comment = 'I am a creature of light.'; -- 更新记录 update test.e set m = 5.444, c = now(3) where id = 1; -- 删除记录 delete from test.e where id = 1; -- 修改表 alter table test.e add column torvalds bigint unsigned after m; -- 删除表 drop table test.e; -- 删除库 drop database test; 观察Maxwell的日志输出,如果没有WARN/ERROR打印,则表示Maxwell安装配置正常。 若要确定数据是否成功上传,可设置config.properties中的log_level为debug,则数据上传成功时会立刻打印如下JSON格式数据,具体字段含义请参考Maxwell生成的数据格式及常见字段含义。 {"database":"test","table":"e","type":"insert","ts":1541150929,"xid":60556,"commit":true,"data":{"id":1,"m":4.2341,"c":"2018-11-02 09:28:49.297000","comment":"I am a creature of light."}} …… 当整个流程调试通过之后,可以把config.properties文件中的配置项log_level修改为info,减少日志打印量,并重启Maxwell。 # log level [debug | info | warn | error] log_level=info
  • 配置Maxwell 在maxwell-XXX文件夹下若有conf目录则配置config.properties文件,配置项说明请参见表1。若没有conf目录,则是在maxwell-XXX文件夹下将config.properties.example修改成config.properties。 表1 Maxwell配置项说明 配置项 是否必填 说明 默认值 user 是 连接MySQL的用户名,即2中新创建的用户 - password 是 连接MySQL的密码,配置文件中包含认证密码信息可能存在安全风险,建议当前场景执行完毕后删除相关配置文件或加强安全管理。 - host 否 MySQL地址 localhost port 否 MySQL端口 3306 log_level 否 日志打印级别,可选值为 debug info warn error info output_ddl 否 是否发送DDL(数据库与数据表的定义修改)事件 true:发送DDL事件 false:不发送DDL事件 false producer 是 生产者类型,配置为kafka stdout:将生成的事件打印在日志中 kafka:将生成的事件发送到kafka stdout producer_partition_by 否 分区策略,用来确保相同一类的数据写入到kafka同一分区 database:使用数据库名称做分区,保证同一个数据库的事件写入到kafka同一个分区中 table:使用表名称做分区,保证同一个表的事件写入到kafka同一个分区中 database ignore_producer_error 否 是否忽略生产者发送数据失败的错误 true:在日志中打印错误信息并跳过错误的数据,程序继续运行 false:在日志中打印错误信息并终止程序 true metrics_slf4j_interval 否 在日志中输出上传kafka成功与失败数据的数量统计的时间间隔,单位为秒 60 kafka.bootstrap.servers 是 kafka代理节点地址,配置形式为HOST:PORT[,HOST:PORT] - kafka_topic 否 写入kafka的topic名称 maxwell dead_letter_topic 否 当发送某条记录出错时,记录该条出错记录主键的kafka topic - kafka_version 否 Maxwell使用的kafka producer版本号,不能在config.properties中配置,需要在启动命令时用-- kafka_version xxx参数传入 - kafka_partition_hash 否 划分kafka topic partition的算法,支持default或murmur3 default kafka_key_format 否 Kafka record的key生成方式,支持array或Hash Hash ddl_kafka_topic 否 当output_ddl配置为true时,DDL操作写入的topic {kafka_topic} filter 否 过滤数据库或表。 若只想采集mydatabase的库,可以配置为 exclude: *.*,include: mydatabase.* 若只想采集mydatabase.mytable的表,可以配置为 exclude: *.*,include: mydatabase.mytable 若只想采集mydatabase库下的mytable,mydate_123, mydate_456表,可以配置为 exclude: *.*,include: mydatabase.mytable, include: mydatabase./mydate_\\d*/ -
  • 安装Maxwell 下载安装包,下载路径为https://github.com/zendesk/maxwell/releases,选择名为maxwell-XXX.tar.gz的二进制文件下载,其中XXX为版本号。 将tar.gz包上传到任意目录下(本示例路径为Master节点的/opt)。 登录部署Maxwell的服务器,并执行如下命令进入tar.gz包所在目录。 cd /opt 执行如下命令解压“maxwell-XXX.tar.gz”压缩包,并进入“maxwell-XXX”文件夹。 tar -zxvf maxwell-XXX.tar.gz cd maxwell-XXX
  • 注意事项 Group By数据倾斜 Group By也同样存在数据倾斜的问题,设置hive.groupby.skewindata为true,生成的查询计划会有两个MapReduce Job,第一个Job的Map输出结果会随机的分布到Reduce中,每个Reduce做聚合操作,并输出结果,这样的处理会使相同的Group By Key可能被分发到不同的Reduce中,从而达到负载均衡,第二个Job再根据预处理的结果按照Group By Key分发到Reduce中完成最终的聚合操作。 Count Distinct聚合问题 当使用聚合函数count distinct完成去重计数时,处理值为空的情况会使Reduce产生很严重的数据倾斜,可以将空值单独处理,如果是计算count distinct,可以通过where子句将该值排除掉,并在最后的count distinct结果中加1。如果还有其他计算,可以先将值为空的记录单独处理,再和其他计算结果合并。
  • 问题 为什么在使用OfflineMetaRepair工具重新构建元数据后,HMaster启动的时候会等待namespace表分配超时,最后启动失败? 且HMaster将输出下列FATAL消息表示中止: 2017-06-15 15:11:07,582 FATAL [Hostname:16000.activeMasterManager] master.HMaster: Unhandled exception. Starting shutdown. java.io.IOException: Timedout 120000ms waiting for namespace table to be assigned at org.apache.hadoop.hbase.master.TableNamespaceManager.start(TableNamespaceManager.java:98) at org.apache.hadoop.hbase.master.HMaster.initNamespace(HMaster.java:1054) at org.apache.hadoop.hbase.master.HMaster.finishActiveMasterInitialization(HMaster.java:848) at org.apache.hadoop.hbase.master.HMaster.access$600(HMaster.java:199) at org.apache.hadoop.hbase.master.HMaster$2.run(HMaster.java:1871) at java.lang.Thread.run(Thread.java:745)
  • 回答 当通过OfflineMetaRepair工具重建元数据时,HMaster在启动期间等待所有region server的WAL分割,以避免数据不一致问题。一旦WAL分割完成,HMaster将进行用户region的分配。所以当在集群异常的场景下,WAL分割可能需要很长时间,这取决于多个因素,例如太多的WALs,较慢的I/O,region servers不稳定等。 为确保HMaster能够成功完成所有region server WAL分割,请执行以下步骤: 确保集群稳定,不存在其他问题。如有任何问题,请先修复。 为“hbase.master.initializationmonitor.timeout”参数配置一个较大的值,默认值为“3600000”毫秒。 重启HBase服务。
  • 操作场景 Hive业务还可能需要关联使用其他组件,例如HQL语句触发MapReduce任务需要设置Yarn权限,或者Hive over HBase的场景需要HBase权限。以下介绍Hive关联Yarn和Hive over HBase两个场景下的操作。 安全模式下Yarn和HBase的权限管理默认是开启的,因此在安全模式下默认需要配置Yarn和HBase权限。 在普通模式下,Yarn和HBase的权限管理默认是关闭的,即任何用户都有权限,因此普通模式下默认不需要配置Yarn和HBase权限。如果用户修改了YARN或者HBase的配置来开启权限管理,则修改后也需要配置Yarn和HBase权限。 MRS 3.x及后续版本支持Ranger,如果当前组件使用了Ranger进行权限控制,须基于Ranger配置相关策略进行权限管理,具体操作可参考添加Hive的Ranger访问权限策略。
  • 操作步骤 配置Driver内存。 Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。 您可以根据实际任务数量的多少,为Driver设置一个合适的内存。 将“spark-defaults.conf”中的“spark.driver.memory”配置项设置为合适大小。 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。 配置Executor个数。 每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。 将“spark-defaults.conf”中的“spark.executor.instance”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTAN CES ”配置项设置为合适大小。 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。 配置Executor核数。 每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 操作场景 Spark on Yarn模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对Spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
  • 示例 在执行spark wordcount计算中。1.6T数据,250个executor。 在默认参数下执行失败,出现Futures timed out和OOM错误。 因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。 当把Driver的内存设置到4g时,应用成功跑完。 使用JDBCServer执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。
  • 前提条件 Loader和Oozie组件及客户端已经安装,并且正常运行。 已创建或获取访问Oozie服务的人机用户账号及密码。 该用户需要从属于hadoop、supergroup、hive组,同时添加Oozie的角色操作权限。若使用Hive多实例,该用户还需要从属于具体的Hive实例组,如hive3。 用户同时还需要至少有manager_viewer权限的角色。 获取运行状态的Oozie服务器(任意实例)URL,如“https://10.1.130.10:21003/oozie”。 获取运行状态的Oozie服务器主机名,如“10-1-130-10”。 获取Yarn ResourceManager主节点IP,如10.1.130.11。 创建需要调度的Loader作业,并获取该作业ID。
  • 操作步骤 以客户端安装用户,登录安装Oozie客户端的节点。 执行以下命令,获取安装环境信息。其中“/opt/client”为客户端安装路径,该操作的客户端目录只是举例,请根据实际安装目录修改。 source /opt/client/bigdata_env 判断集群认证模式。 安全模式,执行kinit命令进行用户认证。 例如,使用oozieuser用户进行认证。 kinit oozieuser 普通模式,执行4。 执行以下命令,进入样例目录。 cd /opt/client/Oozie/oozie-client-*/examples/apps/sqoop/ 该目录下需关注文件如表1所示。 表1 文件说明 文件名称 描述 job.properties 工作流的参数变量定义文件。 workflow.xml 工作流的规则定制文件。 执行以下命令,编辑“job.properties”文件。 vi job.properties 修改如下内容: 更改“userName”的参数值为提交任务的人机用户名,例如“userName=oozieuser”。 执行以下命令,编辑“workflow.xml”文件。 vi workflow.xml 修改如下内容: “command”的值修改为需要调度的已有Loader作业ID,例如1。 将“workflow.xml”文件上传至 "job.properties" 文件中的HDFS路径。 hdfs dfs -put -f workflow.xml /user/userName/examples/apps/sqoop 执行oozie job命令,运行工作流文件。 oozie job -oozie https://oozie角色的主机名:21003/oozie/ -config job.properties -run 命令参数解释如下: -oozie:实际执行任务的Oozie服务器URL。 -config:工作流属性文件。 -run:运行工作流。 执行完工作流文件,显示job id表示提交成功,例如:job: 0000021-140222101051722-oozie-omm-W。登录Oozie管理页面,查看运行情况。 使用oozieuser用户,登录Oozie WebUI页面:https://oozie角色的ip地址:21003/oozie 。 Oozie的WebUI界面中,可在页面表格根据jobid查看已提交的工作流信息。
  • 日志描述 日志存储路径: Flink作业运行日志:“${BIGDATA_DATA_HOME}/hadoop/data${i}/nm/containerlogs/application_${appid}/container_{$contid}”。 运行中的任务日志存储在以上路径中,运行结束后会基于Yarn的配置确定是否汇聚到HDFS目录中。 FlinkResource运行日志:“/var/log/Bigdata/flink/flinkResource”。 日志归档规则: FlinkResource运行日志: 服务日志默认20MB滚动存储一次,最多保留20个文件,不压缩。 针对MRS 3.x之前版本,Executor日志默认30MB滚动存储一次,最多保留20个文件,不压缩。 日志大小和压缩文件保留个数可以在Manager界面中配置或者修改客户端“客户端安装目录/Flink/flink/conf/”中的log4j-cli.properties、log4j.properties、log4j-session.properties中对应的配置项。 表1 FlinkResource日志列表 日志类型 日志文件名 描述 FlinkResource运行日志 checkService.log 健康检查日志。 kinit.log 初始化日志。 postinstall.log 服务安装日志。 prestart.log prestart脚本日志。 start.log 启动日志。 FlinkServer服务日志、审计日志。 FlinkServer服务日志、审计日志默认100MB滚动存储一次,服务日志最多保留30天,审计日志最多保留90天。 日志大小和压缩文件保留个数可以在Manager界面中配置或者修改客户端“客户端安装目录/Flink/flink/conf/”中的log4j-cli.properties、log4j.properties、log4j-session.properties中对应的配置项。 表2 FlinkServer日志列表 日志类型 日志文件名 描述 FlinkServer运行日志 checkService.log 健康检查日志。 checkFlinkServer.log FlinkServer健康检查日志。 localhost_access_log..yyyy-mm-dd.txt FlinkServer访问URL日志。 start_thrift_server.out thrift server启动日志。 thrift_server_thriftServer_xxx.log.last cleanup.log 安装卸载实例时的清理日志。 flink-omm-client-IP.log 作业启动日志。 flinkserver_yyyymmdd-x.log.gz 业务归档日志。 flinkserver.log 业务日志。 flinkserver---pidxxxx-gc.log.x.current GC日志。 kinit.log 初始化日志。 postinstall.log 服务安装日志。 prestart.log prestart脚本日志。 start.log 启动日志。 stop.log 停止日志。 catalina.yyyy-mm-dd.log tomcat运行日志.。 catalina.out host-manager.yyyy-mm-dd.log localhost.yyyy-mm-dd.log manager.yyyy-mm-dd.log FlinkServer审计日志 flinkserver_audit_yyyymmdd-x.log.gz 审计归档日志。 flinkserver_audit.log 审计日志。
  • 日志级别 Flink中提供了如表3所示的日志级别。日志级别优先级从高到低分别是ERROR、WARN、INFO、DEBUG。程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表3 日志级别 级别 描述 ERROR ERROR表示当前时间处理存在错误信息。 WARN WARN表示当前事件处理存在异常信息。 INFO INFO表示记录系统及各事件正常运行状态信息。 DEBUG DEBUG表示记录系统及系统的调试信息。 如果您需要修改日志级别,请执行如下操作: 请参考修改集群服务配置参数,进入Flink的“全部配置”页面。 左边菜单栏中选择所需修改的角色所对应的日志菜单。 选择所需修改的日志级别。 保存配置,在弹出窗口中单击“确定”使配置生效。 配置完成后不需要重启服务,重新下载客户端使配置生效。 也可以直接修改客户端“客户端安装目录/Flink/flink/conf/”中log4j-cli.properties、log4j.properties、log4j-session.properties文件中对应的日志级别配置项。 通过客户端提交作业时会在客户端log文件夹中生成相应日志文件,由于系统默认umask值是0022,所以日志默认权限为644;如果需要修改文件权限,需要修改umask值;例如修改omm用户umask值: 在“/home/omm/.baskrc”文件末尾添加“umask 0026”; 执行命令source /home/omm/.baskrc使文件权限生效。
  • 配置场景 Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下: 自动设置shuffle partition数 在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能合适。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。 动态调整执行计划 在启用Adaptive Execution特性前,Spark SQL根据RBO和CBO的优化结果创建执行计划,此种方法忽略了数据在运行过程中的结果集变化。比如基于某个大表创建的视图,与其他大表join时,即便视图的结果集很小,也无法将执行计划调整为BroadcastJoin。启用Adaptive Execution特性后,Spark SQL能够在运行过程中根据前面stage的运行结果动态调整后续的执行计划,从而获得更好的执行性能。 自动处理数据倾斜 在执行SQL语句时,若存在数据倾斜,可能导致单个executor内存溢出、任务执行缓慢等问题。启动Adaptive Execution特性后,Spark SQL能自动处理数据倾斜场景,对倾斜的分区,启动多个task进行处理,每个task读取若干个shuffle输出文件,再对这部分任务的Join结果进行Union操作,以达到消除数据倾斜的效果
  • 操作步骤 要使用CBO优化,可以按照以下步骤进行优化。 需要先执行特定的SQL语句来收集所需的表和列的统计信息。 SQL命令如下(根据具体情况选择需要执行的SQL命令): 生成表级别统计信息(扫表): ANALYZE TABLE src COMPUTE STATIS TICS 生成sizeInBytes和rowCount。 使用ANALYZE语句收集统计信息时,无法计算非HDFS数据源的表的文件大小。 生成表级别统计信息(不扫表): ANALYZE TABLE src COMPUTE STATISTICS NOSCAN 只生成sizeInBytes,如果原来已经生成过sizeInBytes和rowCount,而本次生成的sizeInBytes和原来的大小一样,则保留rowCount(若存在),否则清除rowCount。 生成列级别统计信息 ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS a, b, c 生成列统计信息,为保证一致性,会同步更新表统计信息。目前不支持复杂数据类型(如Seq, Map等)和HiveStringType的统计信息生成。 显示统计信息 DESC FORMATTED src 在Statistics中会显示“xxx bytes, xxx rows”分别表示表级别的统计信息。也可以通过如下命令显示列统计信息: DESC FORMATTED src a 使用限制:当前统计信息收集不支持针对分区表的分区级别的统计信息。 在Spark客户端的“spark-defaults.conf”配置文件中进行表1设置。 表1 参数介绍 参数 描述 默认值 spark.sql.cbo.enabled CBO总开关。 true表示打开, false表示关闭。 要使用该功能,需确保相关表和列的统计信息已经生成。 false spark.sql.cbo.joinReorder.enabled 使用CBO来自动调整连续的inner join的顺序。 true:表示打开 false:表示关闭 要使用该功能,需确保相关表和列的统计信息已经生成,且CBO总开关打开。 false spark.sql.cbo.joinReorder.dp.threshold 使用CBO来自动调整连续inner join的表的个数阈值。 如果超出该阈值,则不会调整join顺序。 12
  • Map Join Hive的Map Join适用于能够在内存中存放下的小表(指表大小小于25MB),通过“hive.mapjoin.smalltable.filesize”定义小表的大小,默认为25MB。 Map Join的方法有两种: 使用/*+ MAPJOIN(join_table) */。 执行语句前设置如下参数,当前版本中该值默认为true。 set hive.auto.convert.join=true; 使用Map Join时没有Reduce任务,而是在Map任务前起了一个MapReduce Local Task,这个Task通过TableScan读取小表内容到本机,在本机以HashTable的形式保存并写入硬盘上传到DFS,并在distributed cache中保存,在Map Task中从本地磁盘或者distributed cache中读取小表内容直接与大表join得到结果并输出。 使用Map Join时需要注意小表不能过大,如果小表将内存基本用尽,会使整个系统性能下降甚至出现内存溢出的异常。
  • Sort Merge Bucket Map Join 使用Sort Merge Bucket Map Join必须满足以下2个条件: join的两张表都很大,内存中无法存放。 两张表都按照join key进行分桶(clustered by (column))和排序(sorted by(column)),且两张表的分桶数正好是倍数关系。 通过如下设置,启用Sort Merge Bucket Map Join: set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; 这种Map Join也没有Reduce任务,是在Map任务前启动MapReduce Local Task,将小表内容按桶读取到本地,在本机保存多个桶的HashTable备份并写入HDFS,并保存在Distributed Cache中,在Map Task中从本地磁盘或者Distributed Cache中按桶一个一个读取小表内容,然后与大表做匹配直接得到结果并输出。
共100000条