云服务器内容精选

  • 规则 有数据持续写入的表,24小时内至少执行一次compaction。 对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题: Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。 CPU与内存比例为1:4~1:8。 Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:4~1:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。
  • Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
  • 优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
  • 初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; // 入库前去重,如果数据没有重复 该参数无需设置 set hoodie.datasource.write.operation = bulk_insert; // 指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; // 指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数 insert into dsrTable select * from srcTabble
  • 优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
  • 调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
  • 操作场景 Hudi提供多种写入方式,具体见hoodie.datasource.write.operation配置项,这里主要介绍UPSERT、INSERT和BULK_INSERT。 INSERT(插入): 该操作流程和UPSERT基本一致,但是不需要通过索引去查询具体更新的文件分区,因此它的速度比UPSERT快。当数据源不包含更新数据时建议使用该操作,如果数据源中存在更新数据,则在 数据湖 中会出现重复数据。 BULK_INSERT(批量插入):用于初始数据集加载, 该操作会对主键进行排序后直接以写普通parquet表的方式插入Hudi表,该操作性能是最高的,但是无法控制小文件,而UPSERT和INSERT操作使用启发式方法可以很好的控制小文件。 UPSERT(插入更新): 默认操作类型。Hudi会根据主键进行判断,如果历史数据存在则update如果不存在则insert。因此在对于CDC之类几乎肯定包括更新的数据源,建议使用该操作。 由于INSERT时不会对主键进行排序,所以初始化数据集不建议使用INSERT。 在确定数据都为新数据时建议使用INSERT,当存在更新数据时建议使用UPSERT,当初始化数据集时建议使用BULK_INSERT。
  • 批量写入Hudi表 引入Hudi包生成测试数据,参考使用Spark Shell创建Hudi表章节的2到4。 写入Hudi表,写入命令中加入参数:option("hoodie.datasource.write.operation", "bulk_insert"),指定写入方式为bulk_insert,指定其他写入方式请参考表1。 df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.datasource.write.recordkey.field", "uuid"). option("hoodie.datasource.write.partitionpath.field", ""). option("hoodie.datasource.write.operation", "bulk_insert"). option("hoodie.table.name", tableName). option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). option("hoodie.datasource.hive_sync.enable", "true"). option("hoodie.datasource.hive_sync.partition_fields", ""). option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor"). option("hoodie.datasource.hive_sync.table", tableName). option("hoodie.datasource.hive_sync.use_jdbc", "false"). option("hoodie.bulkinsert.shuffle.parallelism", 4). mode(Overwrite). save(basePath) 示例中各参数介绍请参考表1。 使用spark datasource接口更新Mor表,Upsert写入小数据量时可能触发更新数据的小文件合并,使在Mor表的读优化视图中能查到部分更新数据。 当update的数据对应的base文件是小文件时,insert中的数据和update中的数据会被合在一起和base文件直接做合并产生新的base文件,而不是写log。
  • HoodieDeltaStreamer流式写入 Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能: 支持Kafka,DFS多种数据源接入 。 支持管理检查点、回滚和恢复,保证exactly once语义。 支持自定义转换操作。 示例: 准备配置文件kafka-source.properties #hudi配置 hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=age hoodie.upsert.shuffle.parallelism=100 #hive config hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition hoodie.datasource.hive_sync.partition_fields=age hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false hoodie.datasource.hive_sync.support_timestamp=true # Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition #checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition # Kafka props # The kafka cluster we want to ingest from bootstrap.servers= xx.xx.xx.xx:xx auto.offset.reset=earliest #auto.offset.reset=latest group.id=hoodie-delta-streamer offset.rang.limit=10000 指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令: spark-submit --master yarn --jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径 --driver-memory 1g --executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m --driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal --props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。 --target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径 --table-type MERGE_ON_READ // 指定要写入的hudi表类型 --target-table hudimor_deltastreamer_partition // 指定hudi表名 --source-ordering-field name // 指定hudi表预合并列 --source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源为JsonKafkaSource, 该参数根据不同数据源指定不同的source类 --schemaprovider-class com.huaweixxx.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema --transformer-class com.huaweixxx.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制 --enable-hive-sync // 开启hive同步,同步hudi表到hive --continuous // 指定流处理模式为连续模式
  • 注意事项 写入模式:Hudi对于设置了主键的表支持三种写入模式,用户可以设置参数hoodie.sql.insert.mode来指定Insert模式,默认为upsert。 strict模式,Insert语句将保留COW表的主键唯一性约束,不允许重复记录。如果在插入过程中已经存在记录,则会为COW表执行HoodieDuplicateKeyException;对于MOR表,该模式与upsert模式行为一致。 non-strict模式,对主键表采用insert处理。 upsert模式,对于主键表的重复值进行更新操作。 在执行spark-sql时,用户可以设置“hoodie.sql.bulk.insert.enable = true”和“hoodie.sql.insert.mode = non-strict”来开启bulk insert作为Insert语句的写入方式。 也可以通过直接设置hoodie.datasource.write.operation的方式控制insert语句的写入方式,包括bulk_insert、insert、upsert。使用这种方式控制hoodie写入,需要注意执行完SQL后,必须执行reset hoodie.datasource.write.operation;重置Hudi的写入方式,否则该参数会影响其他SQL的执行。
  • 示例 insert into h0 select 1, 'a1', 20; -- insert static partition insert into h_p0 partition(dt = '2021-01-02') select 1, 'a1'; -- insert dynamic partition insert into h_p0 select 1, 'a1', dt; -- insert dynamic partition insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh; -- insert overwrite table insert overwrite table h0 select 1, 'a1', 20; -- insert overwrite table with static partition insert overwrite h_p0 partition(dt = '2021-01-02') select 1, 'a1'; -- insert overwrite table with dynamic partition insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;
  • 推荐资源配置 mor表: 由于其本质上是写增量文件,调优可以直接根据Hudi的数据大小(dataSize)进行调整。 dataSize如果只有几个G,推荐跑单节点运行spark,或者yarn模式但是只分配一个container。 入湖程序的并行度p设置:建议p = (dataSize)/128M,程序分配core的数量保持和p一致即可。内存设置建议内存大小和core的比例大于1.5:1 即一个core配1.5G内存, 堆外内存设置建议内存大小和core的比例大于0.5:1。 cow表: cow表的原理是重写原始数据,因此这种表的调优,要兼顾dataSize和最后重写的文件数量。总体来说core数量越大越好(和最后重写多少个文件数直接相关),并行度p和内存大小和mor设置类似。
  • 读取Hudi数据概述 Hudi的读操作,作用于Hudi的三种视图之上,可以根据需求差异选择合适的视图进行查询。 Hudi支持多种查询引擎Spark、Hive、HetuEngine,具体支持矩阵见表1和表2。 表1 cow表 查询引擎 实时视图/读优化视图 增量视图 Hive Y Y Spark(SparkSQL) Y Y Spark(SparkDataSource API) Y Y HetuEngine Y N 表2 mor表 查询引擎 实时视图 增量视图 读优化视图 Hive Y Y Y Spark(SparkSQL) Y Y Y Spark(SparkDataSource API) Y Y Y HetuEngine Y N Y 当前Hudi使用Spark datasource接口读取时,不支持分区推断能力。比如bootstrap表使用datasource接口查询时,可能出现分区字段不显示,或者显示为null的情况。 增量视图,需设置set hoodie.hudicow.consume.mode = INCREMENTAL;,但该参数仅限于增量视图查询,不能用于Hudi表的其他类型查询,和其他表的查询。 恢复配置可设置set hoodie.hudicow.consume.mode = SNAPSHOT;或任意值。 父主题: Hudi读操作
  • 单表并发控制配置 表6 单表并发控制参数配置 参数 描述 默认值 hoodie.write.lock.provider 指定lock provider,不建议使用默认值,使用org.apache.hudi.hive.HiveMetastoreBasedLockProvider。 org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.hivemetastore.database Hive的database。 无 hoodie.write.lock.hivemetastore.table Hive的table name。 无 hoodie.write.lock.client.num_retries 重试次数。 10 hoodie.write.lock.client.wait_time_ms_between_retry 重试间隔。 10000 hoodie.write.lock.conflict.resolution.strategy lock provider类,必须是ConflictResolutionStrategy的子类。 org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy hoodie.write.lock.zookeeper.base_path 存放ZNodes的路径,同一张表的并发写入需配置一致。 无 hoodie.write.lock.zookeeper.lock_key ZNode的名称,建议与Hudi表名相同。 无 hoodie.write.lock.zookeeper.connection_timeout_ms ZooKeeper连接超时时间。 15000 hoodie.write.lock.zookeeper.port ZooKeeper端口号。 无 hoodie.write.lock.zookeeper.url ZooKeeper的url。 无 hoodie.write.lock.zookeeper.session_timeout_ms ZooKeeper的session过期时间。 60000
  • Clustering配置 本章节内容仅使用于 MRS 3.2.0及之后版本。 Clustering中有两个策略分别是hoodie.clustering.plan.strategy.class和hoodie.clustering.execution.strategy.class。一般情况下指定plan.strategy为SparkRecentDaysClusteringPlanStrategy或者SparkSizeBasedClusteringPlanStrategy时,execution.strategy不需要指定。但当plan.strategy为SparkSingleFileSortPlanStrategy时,需要指定execution.strategy为SparkSingleFileSortExecutionStrategy。 表7 Clustering参数配置 参数 描述 默认值 hoodie.clustering.inline 是否同步执行clustering。 false hoodie.clustering.inline.max.commits 触发clustering的commit数。 4 hoodie.clustering.async.enabled 是否启用异步执行clustering。 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 false hoodie.clustering.async.max.commits 异步执行时触发clustering的commit数。 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 4 hoodie.clustering.plan.strategy.target.file.max.bytes 指定clustering后每个文件大小最大值。 1024 * 1024 * 1024 byte hoodie.clustering.plan.strategy.small.file.limit 小于该大小的文件会被clustering。 300 * 1024 * 1024 byte hoodie.clustering.plan.strategy.sort.columns clustering用以排序的列。 无 hoodie.layout.optimize.strategy Clustering执行策略,可选linear、z-order、hilbert三种排序方式。 linear hoodie.layout.optimize.enable 使用z-order、hilbert时需要开启。 false hoodie.clustering.plan.strategy.class 筛选FileGroup进行clustering的策略类,默认筛选小于hoodie.clustering.plan.strategy.small.file.limit阈值的文件。 org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy hoodie.clustering.execution.strategy.class 执行clustering的策略类(RunClusteringStrategy的子类),用以定义群集计划的执行方式。 默认类们按指定的列对计划中的文件组进行排序,同时满足配置的目标文件大小。 org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.max.num.groups 设置执行clustering时最多选择多少个FileGroup,该值越大并发度越大。 30 hoodie.clustering.plan.strategy.max.bytes.per.group 设置执行clustering时每个FileGroup最多有多少数据参与clustering。 2 * 1024 * 1024 * 1024 byte