华为云用户手册

  • Hive支持ZSTD压缩格式 ZSTD(全称为Zstandard)是一种开源的无损数据压缩算法,其压缩性能和压缩比均优于当前Hadoop支持的其他压缩格式,本特性使得Hive支持ZSTD压缩格式的表。Hive支持基于ZSTD压缩的存储格式有常见的ORC、RCFile、TextFile、JsonFile、Parquet、Squence和 CS V。 可在Hive客户端创建ZSTD压缩格式的表,命令如下: ORC存储格式建表时可指定TBLPROPERTIES("orc.compress"="zstd"): create table tab_1(...) stored as orc TBLPROPERTIES("orc.compress"="zstd"); Parquet存储格式建表可指定TBLPROPERTIES("parquet.compression"="zstd"): create table tab_2(...) stored as parquet TBLPROPERTIES("parquet.compression"="zstd"); 其他格式或通用格式建表可执行设置参数指定compress.codec为“org.apache.hadoop.io.compress.ZStandardCode”: set hive.exec.compress.output=true; set mapreduce.map.output.compress=true; set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.ZStandardCodec; set mapreduce.output.fileoutputformat.compress=true; set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.ZStandardCodec; set hive.exec.compress.intermediate=true; create table tab_3(...) stored as textfile; ZSTD压缩格式的表和其他普通压缩表的SQL操作没有区别,可支持正常的增删查及聚合类SQL操作。 父主题: Hive数据存储及加密配置
  • 查询冻结表的冻结分区 查询冻结分区: show frozen partitions 表名; 默认元数据库冻结分区类型只支持int、string、varchar、date、timestamp类型。 外置元数据库只支持PostgreSQL数据库,且冻结分区类型只支持int、string、varchar、timestamp类型。 对冻结后的表进行Msck元数据修复时,需要先解冻数据。如果对冻结表进行过备份后恢复操作,则可以直接执行Msck元数据修复操作,且解冻只能通过msck repair命令进行操作。 对冻结后的分区进行rename时,需要先解冻数据,否则会提示分区不存在。 删除存在冻结数据的表时,被冻结的数据会同步删除。 删除存在冻结数据的分区时,被冻结的分区信息不会被删除,HDFS业务数据也不会被删除。 select查询数据时,会自动添加排查冷分区数据的过滤条件,查询结果将不包含冷分区的数据。 show partitions table查询表下的分区数据时,查询结果将不包含冷分区,可通过show frozen partitions table进行冷冻分区查询。
  • Hive配置类问题 Hive SQL执行报错:java.lang.OutOfMemoryError: Java heap space. 解决方案: 对于MapReduce任务,增大下列参数: set mapreduce.map.memory.mb=8192; set mapreduce.map.java.opts=-Xmx6554M; set mapreduce.reduce.memory.mb=8192; set mapreduce.reduce.java.opts=-Xmx6554M; 对于Tez任务,增大下列参数: set hive.tez.container.size=8192; Hive SQL对列名as为新列名后,使用原列名编译报错:Invalid table alias or column reference 'xxx'. 解决方案:set hive.cbo.enable=true; Hive SQL子查询编译报错:Unsupported SubQuery Expression 'xxx': Only SubQuery expressions that are top level conjuncts are allowed. 解决方案:set hive.cbo.enable=true; Hive SQL子查询编译报错:CalciteSubquerySemanticException [Error 10249]: Unsupported SubQuery Expression Currently SubQuery expressions are only allowed as Where and Having Clause predicates. 解决方案:set hive.cbo.enable=true; Hive SQL编译报错:Error running query: java.lang.AssertionError: Cannot add expression of different type to set. 解决方案:set hive.cbo.enable=false; Hive SQL执行报错:java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFComputeStats$GenericUDAFNumericStatsEvaluator.init. 解决方案:set hive.map.aggr=false; Hive SQL设置hive.auto.convert.join = true(默认开启)和hive.optimize.skewjoin=true执行报错:ClassCastException org.apache.hadoop.hive.ql.plan.ConditionalWork cannot be cast to org.apache.hadoop.hive.ql.plan.MapredWork. 解决方案:set hive.optimize.skewjoin=false; Hive SQL设置hive.auto.convert.join=true(默认开启)、hive.optimize.skewjoin=true和hive.exec.parallel=true执行报错:java.io.FileNotFoundException: File does not exist:xxx/reduce.xml. 解决方案: 方法一:切换执行引擎为Tez,详情请参考切换Hive执行引擎为Tez。 方法二:set hive.exec.parallel=false; 方法三:set hive.auto.convert.join=false; Hive on Tez执行Bucket表Join报错:NullPointerException at org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.mergeJoinComputeKeys 解决方案:set tez.am.container.reuse.enabled=false; 父主题: Hive常见问题
  • 操作场景 HBase可以通过对HFile中的data block编码,减少Key-Value中Key的重复部分,从而减少空间的使用。目前对data block的编码方式有:NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1,其中NONE表示不使用编码。另外,HBase还支持使用压缩算法对HFile文件进行压缩,默认支持的压缩算法有:NONE、GZ、SNAPPY和ZSTD,其中NONE表示HFile不压缩。 这两种方式都是作用在HBase的列簇上,可以同时使用,也可以单独使用。
  • Hudi表类型 Copy On Write 写时复制表也简称cow表,使用parquet文件存储数据,内部的更新操作需要通过重写原始parquet文件完成。 优点:读取时,只读取对应分区的一个数据文件即可,较为高效。 缺点:数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。且由于耗时,读请求读取到的数据相对就会滞后。 Merge On Read 读时合并表也简称mor表,使用列格式parquet和行格式Avro两种方式混合存储数据。其中parquet格式文件用于存储基础数据,Avro格式文件(也可叫做log文件)用于存储增量数据。 优点:由于写入数据先写delta log,且delta log较小,所以写入成本较低。 缺点:需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log和老数据文件合并。
  • 回答 不可以,会抛HoodieKeyException异常。 Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "name" cannot be null or empty. at org.apache.hudi.keygen.SimpleKeyGenerator.getKey(SimpleKeyGenerator.java:58) at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:104) at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:100)
  • 将Hudi表数据同步到Hive 通过执行run_hive_sync_tool.sh可以将Hudi表数据同步到Hive中。 例如:需要将HDFS上目录为hdfs://hacluster/tmp/huditest/hudimor1_deltastreamer_partition的Hudi表同步为Hive表,表名为table hive_sync_test3,使用unite、country和state为分区键,命令示例如下: run_hive_sync_tool.sh --partitioned-by unite,country,state --base-path hdfs://hacluster/tmp/huditest/hudimor1_deltastreamer_partition --table hive_sync_test3 --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor --support-timestamp 表1 参数说明 命令 描述 必填 默认值 --database Hive database名称 N default --table Hive表名 Y - --base-file-format 文件格式 (PARQUET或HFILE) N PARQUET --user Hive用户名 N - --pass Hive密码 N - --jdbc-url Hive jdbc connect url N - --base-path 待同步的Hudi表存储路径 Y - --partitioned-by 分区键- N - --partition-value-extractor 分区类,需实现PartitionValueExtractor ,可以从HDFS路径中提取分区值 N SlashEncodedDayPartitionValueExtractor --assume-date-partitioning 以yyyy/mm/dd进行分区从而支持向后兼容。 N false --use-pre-apache-input-format 使用com.uber.hoodie包下的InputFormat替换org.apache.hudi包下的。除了从com.uber.hoodie迁移项目至org.apache.hudi外请勿使用。 N false --use-jdbc 使用Hive jdbc连接 N true --auto-create-database 自动创建Hive database N true --skip-ro-suffix 注册时跳过读取_ro后缀的读优化视图 N false --use-file-listing-from-metadata 从Hudi的元数据中获取文件列表 N false --verify-metadata-file-listing 根据文件系统验证Hudi元数据中的文件列表 N false --help、-h 查看帮助 N false --support-timestamp 将原始类型中'INT64'的TIMESTAMP_MICROS转换为Hive的timestamp N false --decode-partition 如果分区在写入过程中已编码,则解码分区值 N false --batch-sync-num 指定每批次同步hive的分区数 N 1000 Hive Sync时会判断表不存在时建外表并添加分区,表存在时对比表的schema是否存在差异,存在则替换,对比分区是否有新增,有则添加分区。 因此使用hive sync时有以下约束: 写入数据Schema只允许增加字段,不允许修改、删除字段。 分区目录只能新增,不会删除。 Overwrite覆写Hudi表不支持同步覆盖Hive表。 Hudi同步Hive表时,不支持使用timestamp类型作为分区列。 父主题: Hudi写操作
  • 基础操作 使用root用户登录集群客户端节点,执行如下命令: cd {客户端安装目录} source bigdata_env source Hudi/component_env kinit 创建的用户 执行hudi-cli.sh进入Hudi客户端, cd {客户端安装目录}/Hudi/hudi/bin/ ./hudi-cli.sh 即可执行各种Hudi命令,执行示例(仅部分命令,全部命令请参考Hudi官网:https://hudi.apache.org/docs/quick-start-guide/): 查看帮助: help //查看hudi-cli的所有命令 help 'command' //查看某一个命令的帮助及参数列表。 连接表: connect --path '/tmp/huditest/test_table' 查看表信息: desc 查看compaction计划: compactions show all 查看clean计划: cleans show 执行clean: cleans run 查看commit信息: commits show 查看commit写入的分区: commit showpartitions --commit 20210127153356 20210127153356表示commit的时间戳,下同。 查看指定commit写入的文件: commit showfiles --commit 20210127153356 比较两个表的commit信息差异: commits compare --path /tmp/hudimor/mytest100 rollback指定提交(rollback每次只允许rollback最后一次commit): commit rollback --commit 20210127164905 compaction调度: compaction schedule --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' 执行compaction compaction run --parallelism 100 --sparkMemory 1g --retry 1 --compactionInstant 20210602101315 --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' --propsFilePath hdfs://hacluster/tmp/default/tb_test_mor/.hoodie/hoodie.properties --schemaFilePath /tmp/default/tb_test_mor/.hoodie/compact_tb_base.json 创建savepoint savepoint create --commit 20210318155750 回滚指定的savepoint savepoint rollback --savepoint 20210318155750 如果commit写入导致元数据冲突异常,执行commit rollback、savepoint rollback能回退数据,但不能回退Hive元数据,只能删除Hive表然后手动进行同步刷新。 commit rollback只能回退当前最新的一个commit,savepoint rollback只能回退到最新的一个savepoint。二者均不能随意指定进行回退。
  • 配置描述 在Spark客户端的“core-site.xml”配置文件中修改配置。 表1 参数介绍 参数 描述 默认值 fs.obs.metrics.switch 上报OBS监控指标开关: true表示打开 false表示关闭 true fs.obs.metrics.consumer 指定OBS监控指标的处理方式。 org.apache.hadoop.fs.obs.metrics.OBSAMetricsProvider:表示收集统计OBS监控指标 org.apache.hadoop.fs.obs.DefaultMetricsConsumer:表示不收集OBS监控指标 要使用OBS监控功能,需确保上报OBS监控指标开关打开。 org.apache.hadoop.fs.obs.metrics.OBSAMetricsProvider
  • 回答 运行包含Reduce的Mapreduce任务时,通过-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.nativetask.NativeMapOutputCollectorDelegator命令开启Native Task特性,任务在部分操作系统运行失败,日志中提示错误“version 'GLIBCXX_3.4.20' not found”。该问题原因是操作系统的GLIBCXX版本较低,导致该特性依赖的libnativetask.so.1.0.0库无法加载,进而导致任务失败。 规避手段: 设置配置项mapreduce.job.map.output.collector.class的值为org.apache.hadoop.mapred.MapTask$MapOutputBuffer。
  • Flink用户权限说明 访问并使用Flink WebUI进行业务操作需为用户赋予FlinkServer相关权限,Manager的admin用户没有FlinkServer的业务操作权限。 FlinkServer中应用(租户)是最大管理范围,包含集群连接管理、数据连接管理、应用管理、流表和作业管理等。 FlinkServer中有如表1所示三种资源权限: 表1 FlinkServer资源权限 权限名称 权限描述 备注 FlinkServer管理员权限 具有所有应用的编辑、查看权限。 是FlinkServer的最高权限。如果已经具有FlinkServer管理员权限,则会自动具备所有应用的权限。 应用编辑权限 具有当前应用编辑权限的用户,可以执行创建、编辑和删除集群连接、数据连接,创建流表、创建作业及运行作业等操作。 同时具有当前应用查看权限。 应用查看权限 具有当前应用查看权限的用户,可以查看应用。 - 父主题: Flink用户权限管理
  • CarbonData表操作并发语法说明 DDL和DML中的操作,执行前,需要获取对应的锁,各操作需要获取锁的情况见表1 操作获取锁一览表,√表示需要获取该锁,一个操作仅在获取到所有需要获取的锁后,才能继续执行。 任意两个操作是否可以并发执行,可以通过如下方法确定:表1两行代表两个操作,这两行没有任意一列都标记√,即不存在某一列两行全为√。 表1 操作获取锁一览表 操作 METADATA_LOCK COMPACTION_LOCK DROP_TABLE_LOCK DELETE_SEGMENT_LOCK CLEAN_FILES_LOCK ALTER_PARTITION_LOCK UPDATE_LOCK STREAMING_LOCK CONCURRENT_LOAD_LOCK SEGMENT_LOCK CREATE TABLE - - - - - - - - - - CREATE TABLE As SELECT - - - - - - - - - - DROP TABLE √ - √ - - - - √ - - ALTER TABLE COMPACTION - √ - - - - √ - - - TABLE RENAME - - - - - - - - - - ADD COLUMNS √ √ - - - - - - - - DROP COLUMNS √ √ - - - - - - - - CHANGE DATA TYPE √ √ - - - - - - - - REFRESH TABLE - - - - - - - - - - REGISTER INDEX TABLE √ - - - - - - - - - REFRESH INDEX - √ - - - - - - - - LOAD DATA/INSERT INTO - - - - - - - - √ √ UPDATE CARBON TABLE √ √ - - - - √ - - - DELETE RECORDS from CARBON TABLE √ √ - - - - √ - - - DELETE SEGMENT by ID - - - √ √ - - - - - DELETE SEGMENT by DATE - - - √ √ - - - - - SHOW SEGMENTS - - - - - - - - - - CREATE SECONDARY INDEX √ √ - √ - - - - - - SHOW SECONDARY INDEXES - - - - - - - - - - DROP SECONDARY INDEX √ - √ - - - - - - - CLEAN FILES - - - - - - - - - - SET/RESET - - - - - - - - - - Add Hive Partition - - - - - - - - - - Drop Hive Partition √ √ √ √ √ √ - - - - Drop Partition √ √ √ √ √ √ - - - - Alter table set √ √ - - - - - - - - 父主题: CarbonData语法参考
  • 查询Hudi的Schema演进表对应的Hive外部表 如果该Hudi表为Schema演进表(表的字段执行过修改),则在Hive客户端查询该表时还需额外设置一个参数: set hive.exec.schema.evolution=true; 例如以cow表实时视图的查询举例,其他各个视图的查询都要额外添加该参数: set hive.exec.schema.evolution=true; select * from hudicow;
  • Hudi表对应的Hive外部表介绍 Hudi源表对应一份HDFS的数据,通过Spark组件、Flink组件或者Hudi客户端,可以将Hudi表的数据映射为Hive外部表,基于该外部表,Hive可以进行实时视图查询、读优化视图查询以及增量视图查询。 根据Hudi源表的类型的不同,提供不同的视图查询: Hudi源表类型为Copy On Write时,可以映射为Hive的一张外部表,该表可以提供实时视图查询以及增量视图查询。 Hudi源表类型为Merge On Read时,可以映射为Hive的两张外部表(ro表和rt表),ro表提供读优化视图查询,rt表提供实时视图查询以及增量视图查询。 不能对Hudi表映射的Hive外部表做增删改操作(即insert、update、delete、load、merge、alter、msck),只支持查询操作(select)。 表授权:不支持修改类权限(update、Alter、write、All)。 备份与恢复:由于ro表和rt表均由同一个Hudi源表映射的,备份其中一张表,另一张也会跟着备份,恢复也是同时恢复的,因此只需备份其中一张表即可。 组件版本: Hive: FusionInsight _HD_xxx,Hive内核版本3.1.0。 Spark2x:FusionInsight_Spark2x_xxx,Hudi内核版本:0.11.0。
  • 推荐资源配置 mor表: 由于其本质上是写增量文件,调优可以直接根据Hudi的数据大小(dataSize)进行调整。 dataSize如果只有几个G,推荐跑单节点运行spark,或者yarn模式但是只分配一个container。 入湖程序的并行度p设置:建议p = (dataSize)/128M,程序分配core的数量保持和p一致即可。内存设置建议内存大小和core的比例大于1.5:1 即一个core配1.5G内存, 堆外内存设置建议内存大小和core的比例大于0.5:1。 cow表: cow表的原理是重写原始数据,因此这种表的调优,要兼顾dataSize和最后重写的文件数量。总体来说core数量越大越好(和最后重写多少个文件数直接相关),并行度p和内存大小和mor设置类似。
  • 操作场景 Hudi提供多种写入方式,具体见hoodie.datasource.write.operation配置项,这里主要介绍UPSERT、INSERT和BULK_INSERT。 INSERT(插入): 该操作流程和UPSERT基本一致,但是不需要通过索引去查询具体更新的文件分区,因此它的速度比UPSERT快。当数据源不包含更新数据时建议使用该操作,如果数据源中存在更新数据,则在 数据湖 中会出现重复数据。 BULK_INSERT(批量插入):用于初始数据集加载, 该操作会对主键进行排序后直接以写普通parquet表的方式插入Hudi表,该操作性能是最高的,但是无法控制小文件,而UPSERT和INSERT操作使用启发式方法可以很好的控制小文件。 UPSERT(插入更新): 默认操作类型。Hudi会根据主键进行判断,如果历史数据存在则update如果不存在则insert。因此在对于CDC之类几乎肯定包括更新的数据源,建议使用该操作。 由于INSERT时不会对主键进行排序,所以初始化数据集不建议使用INSERT。 在确定数据都为新数据时建议使用INSERT,当存在更新数据时建议使用UPSERT,当初始化数据集时建议使用BULK_INSERT。
  • 批量写入Hudi表 引入Hudi包生成测试数据,参考使用Spark Shell创建Hudi表章节的2到4。 写入Hudi表,写入命令中加入参数:option("hoodie.datasource.write.operation", "bulk_insert"),指定写入方式为bulk_insert,指定其他写入方式请参考表1。 df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.datasource.write.recordkey.field", "uuid"). option("hoodie.datasource.write.partitionpath.field", ""). option("hoodie.datasource.write.operation", "bulk_insert"). option("hoodie.table.name", tableName). option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). option("hoodie.datasource.hive_sync.enable", "true"). option("hoodie.datasource.hive_sync.partition_fields", ""). option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor"). option("hoodie.datasource.hive_sync.table", tableName). option("hoodie.datasource.hive_sync.use_jdbc", "false"). option("hoodie.bulkinsert.shuffle.parallelism", 4). mode(Overwrite). save(basePath) 示例中各参数介绍请参考表1。 使用spark datasource接口更新Mor表,Upsert写入小数据量时可能触发更新数据的小文件合并,使在Mor表的读优化视图中能查到部分更新数据。 当update的数据对应的base文件是小文件时,insert中的数据和update中的数据会被合在一起和base文件直接做合并产生新的base文件,而不是写log。
  • 单表并发控制配置 表6 单表并发控制参数配置 参数 描述 默认值 hoodie.write.lock.provider 指定lock provider,不建议使用默认值,使用org.apache.hudi.hive.HiveMetastoreBasedLockProvider。 org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.hivemetastore.database Hive的database。 无 hoodie.write.lock.hivemetastore.table Hive的table name。 无 hoodie.write.lock.client.num_retries 重试次数。 10 hoodie.write.lock.client.wait_time_ms_between_retry 重试间隔。 10000 hoodie.write.lock.conflict.resolution.strategy lock provider类,必须是ConflictResolutionStrategy的子类。 org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy hoodie.write.lock.zookeeper.base_path 存放ZNodes的路径,同一张表的并发写入需配置一致。 无 hoodie.write.lock.zookeeper.lock_key ZNode的名称,建议与Hudi表名相同。 无 hoodie.write.lock.zookeeper.connection_timeout_ms ZooKeeper连接超时时间。 15000 hoodie.write.lock.zookeeper.port ZooKeeper端口号。 无 hoodie.write.lock.zookeeper.url ZooKeeper的url。 无 hoodie.write.lock.zookeeper.session_timeout_ms ZooKeeper的session过期时间。 60000
  • Clustering配置 本章节内容仅适用于 MRS 3.2.0及之后版本。 Clustering中有两个策略分别是hoodie.clustering.plan.strategy.class和hoodie.clustering.execution.strategy.class。一般情况下指定plan.strategy为SparkRecentDaysClusteringPlanStrategy或者SparkSizeBasedClusteringPlanStrategy时,execution.strategy不需要指定。但当plan.strategy为SparkSingleFileSortPlanStrategy时,需要指定execution.strategy为SparkSingleFileSortExecutionStrategy。 表7 Clustering参数配置 参数 描述 默认值 hoodie.clustering.inline 是否同步执行clustering。 false hoodie.clustering.inline.max.commits 触发clustering的commit数。 4 hoodie.clustering.async.enabled 是否启用异步执行clustering。 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 false hoodie.clustering.async.max.commits 异步执行时触发clustering的commit数。 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 4 hoodie.clustering.plan.strategy.target.file.max.bytes 指定clustering后每个文件大小最大值。 1024 * 1024 * 1024 byte hoodie.clustering.plan.strategy.small.file.limit 小于该大小的文件会被clustering。 300 * 1024 * 1024 byte hoodie.clustering.plan.strategy.sort.columns clustering用以排序的列。 无 hoodie.layout.optimize.strategy Clustering执行策略,可选linear、z-order、hilbert三种排序方式。 linear hoodie.layout.optimize.enable 使用z-order、hilbert时需要开启。 false hoodie.clustering.plan.strategy.class 筛选FileGroup进行clustering的策略类,默认筛选小于hoodie.clustering.plan.strategy.small.file.limit阈值的文件。 org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy hoodie.clustering.execution.strategy.class 执行clustering的策略类(RunClusteringStrategy的子类),用以定义群集计划的执行方式。 默认类们按指定的列对计划中的文件组进行排序,同时满足配置的目标文件大小。 org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.max.num.groups 设置执行clustering时最多选择多少个FileGroup,该值越大并发度越大。 30 hoodie.clustering.plan.strategy.max.bytes.per.group 设置执行clustering时每个FileGroup最多有多少数据参与clustering。 2 * 1024 * 1024 * 1024 byte
  • compaction&cleaning配置 表5 compaction&cleaning参数配置 参数 描述 默认值 hoodie.clean.automatic 是否执行自动clean。 true hoodie.cleaner.policy 要使用的清理策略。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。建议确保数据保留的时间超过最大查询执行时间。 KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained 保留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的),这也直接转化为逐步提取此数据集的数量。 10 hoodie.keep.max.commits 触发归档操作的commit数阈值。 30 hoodie.keep.min.commits 归档操作保留的commit数。 20 hoodie.commits.archival.batch 这控制着批量读取并一起归档的提交即时的数量。 10 hoodie.parquet.small.file.limit 该值应小于maxFileSize,如果将其设置为0,会关闭此功能。由于批处理中分区中插入记录的数量众多,总会出现小文件。Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。此处的大小是被视为“小文件大小”的最小文件大小。 104857600 byte hoodie.copyonwrite.insert.split.size 插入写入并行度。为单个分区的总共插入次数。写出100MB的文件,至少1KB大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)。 500000 hoodie.copyonwrite.insert.auto.split Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize,默认关闭。 true hoodie.copyonwrite.record.size.estimate 平均记录大小。如果指定,Hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。 1024 hoodie.compact.inline 当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩。 true hoodie.compact.inline.max.delta.commits 触发内联压缩之前要保留的最大增量提交数。 5 hoodie.compaction.lazy.block.read 当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)。 true hoodie.compaction.reverse.log.read HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件。 false hoodie.cleaner.parallelism 如果清理变慢,请增加此值。 200 hoodie.compaction.strategy 用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。默认情况下,Hudi选择具有累积最多未合并数据的日志文件。 org.apache.hudi.table.action.compact.strategy. LogFileSizeBasedCompactionStrategy hoodie.compaction.target.io LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。 500 * 1024 MB hoodie.compaction.daybased.target.partitions 由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。 10 hoodie.compaction.payload.class 这需要与插入/插入更新过程中使用的类相同。就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。 org.apache.hudi.common.model.Defaulthoodierecordpayload hoodie.schedule.compact.only.inline 在写入操作时,是否只生成压缩计划。在hoodie.compact.inline=true时有效。 false hoodie.run.compact.only.inline 通过Sql执行run compaction命令时,是否只执行压缩操作,压缩计划不存在时直接退出。 false
  • index相关配置 表3 index相关参数配置 参数 描述 默认值 hoodie.index.class 用户自定义索引的全路径名,索引类必须为HoodieIndex的子类,当指定该配置时,其会优先于hoodie.index.type配置。 "" hoodie.index.type 使用的索引类型,默认为布隆过滤器。可能的选项是[BLOOM | HBASE | GLOBAL_BLOOM | SIMPLE | GLOBAL_SIMPLE] 。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中。 BLOOM hoodie.index.bloom.num_entries 存储在布隆过滤器中的条目数。 假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。 注意: 将此值设置的太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置的非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。 60000 hoodie.index.bloom.fpp 根据条目数允许的错误率。 用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置的很低(默认值0.000000001),在磁盘空间上进行权衡以降低误报率。 0.000000001 hoodie.bloom.index.parallelism 索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,根据输入的工作负载特征自动计算的。 0 hoodie.bloom.index.prune.by.ranges 为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。 true hoodie.bloom.index.use.caching 为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找。 true hoodie.bloom.index.use.treebased.filter 为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度。 true hoodie.bloom.index.bucketized.checking 为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差。 true hoodie.bloom.index.keys.per.bucket 仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。 此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。 10000000 hoodie.bloom.index.update.partition.path 仅在索引类型为GLOBAL_BLOOM时适用。 为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。 true hoodie.index.hbase.zkquorum 仅在索引类型为HBase时适用,必填选项。要连接的HBase ZK Quorum URL。 无 hoodie.index.hbase.zkport 仅在索引类型为HBase时适用,必填选项。要连接的HBase ZK Quorum端口。 无 hoodie.index.hbase.zknode.path 仅在索引类型为HBase时适用,必填选项。这是根znode,它将包含HBase创建及使用的所有znode。 无 hoodie.index.hbase.table 仅在索引类型为HBase时适用,必填选项。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。 无
  • 存储配置 表4 存储参数配置 参数 描述 默认值 hoodie.parquet.max.file.size Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。 120 * 1024 * 1024 byte hoodie.parquet.block.size parquet页面大小,页面是parquet文件中的读取单位,在一个块内,页面被分别压缩。 120 * 1024 * 1024 byte hoodie.parquet.compression.ratio 当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。 如果bulk_insert生成的文件小于预期大小,请增加此值。 0.1 hoodie.parquet.compression.codec parquet压缩编解码方式名称,默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo] snappy hoodie.logfile.max.size LogFile的最大值。这是在将日志文件移到下一个版本之前允许的最大值。 1GB hoodie.logfile.data.block.max.size LogFile数据块的最大值。这是允许将单个数据块附加到日志文件的最大值。 这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。 256MB hoodie.logfile.to.parquet.compression.ratio 随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。 用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。 0.35
  • 写入操作配置 表1 写入操作重要配置项 参数 描述 默认值 hoodie.datasource.write.table.name 指定写入的Hudi表名。 无 hoodie.datasource.write.operation 写Hudi表指定的操作类型,当前支持upsert、delete、insert、bulk_insert等方式。 upsert:更新插入混合操作。 delete:删除操作。 insert:插入操作。 bulk_insert: 用于初始建表导入数据, 注意初始建表禁止使用upsert、insert方式。 insert_overwrite:对静态分区执行insert overwrite。 insert_overwrite_table:动态分区执行insert overwrite,该操作并不会立刻删除全表做overwrite,会逻辑上重写hudi表的元数据,无用数据后续由hudi的clean机制清理。效率比bulk_insert加overwrite高。 upsert hoodie.datasource.write.table.type 指定Hudi表类型,一旦这个表类型被指定,后续禁止修改该参数,可选值MERGE_ON_READ。 COPY_ON_WRITE hoodie.datasource.write.precombine.field 该值用于在写之前对具有相同的key的行进行合并去重。 指定为具体的表字段 hoodie.datasource.write.payload.class 在更新过程中,该类用于提供方法将要更新的记录和更新的记录做合并,该实现可插拔,如要实现自己的合并逻辑,可自行编写。 org.apache.hudi.common.model.DefaultHoodieRecordPayload hoodie.datasource.write.recordkey.field 用于指定Hudi的主键,Hudi表要求有唯一主键。 指定为具体的表字段 hoodie.datasource.write.partitionpath.field 用于指定分区键,该值配合hoodie.datasource.write.keygenerator.class使用可以满足不同的分区场景。 无 hoodie.datasource.write.hive_style_partitioning 用于指定分区方式是否和Hive保持一致,建议该值设置为true。 true hoodie.datasource.write.keygenerator.class 配合hoodie.datasource.write.partitionpath.field,hoodie.datasource.write.recordkey.field产生主键和分区方式。 说明: 写入设置KeyGenerator与表保存的参数值不一致时将提示需要保持一致。 org.apache.hudi.keygen.ComplexKeyGenerator
  • 同步Hive表配置 表2 同步Hive表参数配置 参数 描述 默认值 hoodie.datasource.hive_sync.enable 是否同步Hudi表信息到Hive Metastore。 注意: 建议该值设置为true,统一使用Hive管理Hudi表。 false hoodie.datasource.hive_sync.database 要同步给Hive的数据库名。 default hoodie.datasource.hive_sync.table 要同步给Hive的表名,建议这个值和hoodie.datasource.write.table.name保证一致。 unknown hoodie.datasource.hive_sync.username 同步Hive时,指定的用户名。 hive hoodie.datasource.hive_sync.password 同步Hive时,指定的密码。 hive hoodie.datasource.hive_sync.jdbcurl 连接Hive JDBC指定的连接。 "" hoodie.datasource.hive_sync.use_jdbc 是否使用Hive JDBC方式连接Hive同步Hudi表信息。 建议该值设置为false,设置为false后JDBC连接相关配置无效。 true hoodie.datasource.hive_sync.partition_fields 用于决定Hive分区列。 "" hoodie.datasource.hive_sync.partition_extractor_class 用于提取Hudi分区列值,将其转换成Hive分区列。 org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor hoodie.datasource.hive_sync.support_timestamp 当hudi表存在timestamp类型字段时,需指定此参数为true,以实现同步timestamp类型到hive元数据中。 该值默认为false,默认将timestamp类型同步为bigInt,默认情况可能导致使用sql查询包含timestamp类型字段的hudi表出现错误。 true
  • 新建FlinkServer流表步骤 访问Flink WebUI,请参考访问FlinkServer WebUI界面。 单击“流表管理”进入流表管理页面。 单击“新建流表”,在新建流表页面参考表1填写信息,单击“确定”,完成流表创建。创建完成后,可在对应流表的“操作”列对流表进行编辑、删除等操作。 图1 新建流表 表1 新建流表信息 参数名称 参数描述 备注 流/表名称 流/表的名称。 例如:flink_sink 描述 流/表的描述信息。 - 映射表类型 Flink SQL本身不带有数据存储功能,所有涉及表创建的操作,实际上均是对于外部数据表、存储的引用映射。 类型包含Kafka、HDFS。 - 类型 包含数据源表Source,数据结果表Sink。不同映射表类型包含的表如下所示。 Kafka:Source、Sink HDFS:Source、Sink - 数据连接 选择数据连接。 - Topic 读取的Kafka的topic,支持从多个Kakfa topic中读取,topic之间使用英文分隔符进行分隔。 “映射表类型”选择“Kafka”时存在此参数。 - 文件路径 要传输的HDFS目录或单个文件路径。 “映射表类型”选择“HDFS”时存在此参数。 例如: “/user/sqoop/ ”或“/user/sqoop/example.csv” 编码 选择不同“映射表类型”对应的编码如下: Kafka:CSV、JSON HDFS:CSV - 前缀 “映射表类型”选择“Kafka”,且“类型”选择“Source”,“编码”选择“JSON”时含义为:多层嵌套json的层级前缀,使用英文逗号(,)进行分隔。 例如:data,info表示取嵌套json中data,info下的内容,作为json格式数据输入 分隔符 选择不同“映射表类型”对应的含义为:用于指定CSV字段分隔符。当数据“编码”为“CSV”时存在此参数。 例如:“,” 行分隔符 文件中的换行符,包含“\r”、“\n”、“\r\n”。 “映射表类型”选择“HDFS”时存在此参数。 - 列分隔符 文件中的字段分隔符。 “映射表类型”选择“HDFS”时存在此参数。 例如:“,” 流/表结构 填写流/表结构,包含名称,类型。 - Proctime 指系统时间,与数据本身的时间戳无关,即在Flink算子内计算完成的时间。 “类型”选择“Source”时存在此参数。 - Event Time 指事件产生的时间,即数据产生时自带时间戳。 “类型”选择“Source”时存在此参数。 -
  • Flink WebUI特点 Flink WebUI主要有以下特点: 企业级可视化运维:运维管理界面化、作业监控、作业开发Flink SQL标准化等。 图1 可视化运维 快速建立集群连接:通过集群连接功能配置访问一个集群,需要客户端配置、用户认证密钥文件。 快速建立数据连接:通过数据连接功能配置访问一个组件。创建“数据连接类型”为“HDFS”类型时需创建集群连接,其他数据连接类型的“认证类型”为“KERBEROS”需创建集群连接,“认证类型”为“SIMPLE”不需创建集群连接。 “数据连接类型”为“Kafka”时,认证类型不支持“KERBEROS”。 可视化开发平台:支持自定义输入/输出映射表,满足不同输入来源、不同输出目标端的需求。 图形化作业管理:简单易用。 图2 图形化作业管理
  • Flink WebUI应用流程 Flink WebUI应用流程参考如下步骤: 图3 Flink WebUI应用流程 表2 Flink WebUI应用流程说明 阶段 说明 参考章节 创建应用 通过应用来隔离不同的上层业务。 创建FlinkServer应用 创建集群连接 通过集群连接配置访问不同的集群。 创建FlinkServer集群连接 创建数据连接 通过数据连接,访问不同的数据服务,包括HDFS、Kafka等。 创建FlinkServer数据连接 创建流表 通过数据表,定义源表、维表、输出表的基本属性和字段信息。 创建FlinkServer流表源 创建SQL/JAR作业(流式/批作业) 定义Flink作业的API,包括Flink SQL和Flink Jar作业。 如何创建FlinkServer作业 作业管理 管理创建的作业,包括作业启动、开发、停止、删除和编辑等。 如何创建FlinkServer作业
  • Flink WebUI关键能力 FlinkWebUI关键能力如表1: 表1 Flink WebUI关键能力 关键能力分类 描述 批流一体 支持一套FlinkSQL定义批作业和流作业。 Flink SQL内核能力 Flink SQL支持自定义大小窗、24小时以内流计算、超出24小时批处理。 FlinkSQL支持Kafka、HDFS读取;支持写入Kafka和HDFS。 支持同一个作业定义多个FlinkSQL,多个指标合并在一个作业计算。当一个作业是相同主键、相同的输入和输出时,该作业支持多个窗口的计算。 支持AVG、SUM、COUNT、MAX和MIN统计方法。 Flink SQL可视化定义 集群连接管理,配置Kafka、HDFS等服务所属的集群信息。 数据连接管理,配置Kafka、HDFS等服务信息。 数据表管理,定义Sql访问的数据表信息,用于生成DDL语句。 FlinkSQL作业定义,根据用户输入的Sql,校验、解析、优化、转换成Flink作业并提交运行。 Flink作业可视化管理 支持可视化定义流作业和批作业。 支持作业资源、故障恢复策略、Checkpoint策略可视化配置。 流作业和批作业的状态监控。 Flink作业运维能力增强,包括原生监控页面跳转。 性能&可靠性 流处理支持24小时窗口聚合计算,毫秒级性能。 批处理支持90天窗口聚合计算,分钟级计算完成。 支持对流处理和批处理的数据进行过滤配置,过滤无效数据。 读取HDFS数据时,提前根据计算周期过滤。 作业定义平台故障、服务降级,不支持再定义作业,但是不影响已有作业计算。 作业故障有自动重启机制,重启策略可配置。
  • 前提条件 HDFS和Oozie组件安装完成且运行正常,客户端安装成功。 如果当前客户端为旧版本,需要重新下载和安装客户端。 已创建或获取访问Oozie服务的人机用户账号及密码。 该用户需要从属于hadoop、supergroup、hive组,同时添加Oozie的角色操作权限。如果使用Hive多实例,该用户还需要从属于具体的Hive实例组,如hive3。 用户同时还需要至少有manager_viewer权限的角色。
  • 操作场景 用户需要使用图形化界面在集群中创建或查询HBase表时,可以通过Hue完成任务。 如需在Hue WebUI中操作HBase,当前MRS集群中必须部署HBase的Thrift1Server实例。 Thrift1Server实例默认不会安装,用户可在创建自定义类型的MRS集群时,选择HBase组件并通过调整集群自定义拓扑,添加Thrift1Server实例,详情请参考购买自定义拓扑集群。 如果当前集群支持手动添加服务,也可以在首次添加HBase服务时,选择部署Thrift1Server实例,服务添加成功后,需重启Hue服务,详情请参考添加服务。
共100000条