云服务器内容精选

  • 确认表内桶数 Hudi表的桶数设置,关系到表的性能,需要格外引起注意。 以下几点,是设置桶数的关键信息,需要建表前确认。 非分区表 单表数据总条数 = select count(1) from tablename(入湖时需提供); 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100将查询结果粘贴在notepad++中得出100条数据的大小再除以100得到单条平均大小) 单表数据量大小(G) = 单表数据总条数*单表数据大小/1024/1024 非分区表桶数 = MAX(单表数据量大小(G)/2G*2,再向上取整,4) 分区表 最近一个月最大数据量分区数据总条数 = 入湖前咨询产品线 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100将查询结果粘贴在notepad++中得出100条数据的大小再除以100得到单条平均大小) 单分区数据量大小(G) = 最近一个月最大数据量分区数据总条数*单表数据大小/1024/1024 分区表桶数 = MAX(单分区数据量大小(G)/2G,再后向上取整,1) 需要使用的是表的总数据大小,而不是压缩以后的文件大小 桶的设置以偶数最佳,非分区表最小桶数请设置4个,分区表最小桶数请设置1个。
  • 确认建表SQL DataArts支持通过Spark JDBC方式和Spark API方式操作Hudi表: Spark JDBC方式使用公用资源,不用单独起Spark作业,但是不能指定执行SQL所需要的资源以及配置参数,因此建议用来做建表操作或小数据量的查询操作。 Spark API方式执行的SQL独立起Spark作业,有一定的耗时,但是可以通过配置运行程序参数来指定作业所需要的资源等参数,建议批量导入等 作业使用API方式来指定资源运行,防止占用jdbc资源长时间阻塞其他任务。 DataArts使用Spark API方式操作Hudi表,必须要添加参数--conf spark.support.hudi=true,并且通过执行调度来运行作业。
  • 判断使用分区表还是非分区表 根据表的使用场景一般将表分为事实表和维度表: 事实表通常整表数据规模较大,以新增数据为主,更新数据占比小,且更新数据大多落在近一段时间范围内(年或月或天),下游读取该表进行ETL计算时通常会使用时间范围进行裁剪(例如最近一天、一月、一年),这种表通常可以通过数据的创建时间来做分区已保证最佳读写性能。 维度表数据量一般整表数据规模较小,以更新数据为主,新增较少,表数据量比较稳定,且读取时通常需要全量读取做join之类的ETL计算,因此通常使用非分区表性能更好。 分区表的分区键不允许更新,否则会产生重复数据。 例外场景:超大维度表和超小事实表 特殊情况如存在持续大量新增数据的维度表(表数据量在200G以上或日增长量超过60M)或数据量非常小的事实表(表数据量小于10G且未来三至五年增长后也不会超过10G)需要针对具体场景来进行例外处理: 持续大量新增数据的维度表 方法一:预留桶数,如使用非分区表则需通过预估较长一段时间内的数据增量来预先增加桶数,缺点是随着数据的增长,文件依然会持续膨胀; 方法二:大粒度分区(推荐),如果使用分区表则需要根据数据增长情况来计算,例如使用年分区,这种方式相对麻烦些但是多年后表无需重新导入。 方法三:数据老化,按照业务逻辑分析大的维度表是否可以通过数据老化清理无效的维度数据从而降低数据规模。 数据量非常小的事实表 这种可以在预估很长一段时间的数据增长量的前提下使用非分区表预留稍宽裕一些的桶数来提升读写性能。
  • 使用DataArts创建Hudi表 DataArts支持通过Spark JDBC方式和Spark API方式操作Hudi表: Spark JDBC方式使用公用资源,不用单独起Spark作业,但是不能指定执行SQL所需要的资源以及配置参数,因此建议用来做建表操作或小数据量的查询操作。 Spark API方式执行的SQL独立起Spark作业,有一定的耗时,但是可以通过配置运行程序参数来指定作业所需要的资源等参数,建议批量导入等 作业使用API方式来指定资源运行,防止占用jdbc资源长时间阻塞其他任务。 DataArts使用Spark API方式操作Hudi表,必须要添加参数--conf spark.support.hudi=true,并且通过执行调度来运行作业。
  • Hudi表初始化 初始化导入存量数据通常有Spark作业来完成,由于初始化数据量通常较大,因此推荐使用API方式给充足资源来完成。 对于批量初始化后需要接Flink或Spark流作业实时写入的场景,一般建议通过对上有消息进行过滤,从一个指定的时间范围开始消费来控制数据的重复接入量(例如Spark初始化完成后,Flink消费Kafka时过滤掉2小时之前的数据),如果无法对kafka消息进行过滤,则可以考虑先实时接入生成offset,再truncate table ,再历史导入,再开启实时。 初始化操作流程应遵循下面的步骤: 如果批量初始化前表里已经存在数据且没有truncate table,则会导致批量数据写成非常大的log文件,对后续compaction形成很大压力需要更多资源才能完成 Hudi表在Hive元数据中,应该会存在1张内部表(手动创建),2张外部表(写入数据后自动创建)。 2张外部表,表名_ro(用户只读合并后的parquet文件,即读优化视图表),_rt(读实时写入的最新版本数据,即实时视图表)。 父主题: Bucket调优示例
  • 实时任务接入 实时作业一般由Flink Sql或Sparkstreaming来完成,流式实时任务通常配置同步生成compaction计划,异步执行计划。 Flink SQL作业中sink端Hudi表相关配置如下: create table denza_hudi_sink ( $HUDI_SINK_SQL_REPLACEABLE$ ) PARTITIONED BY ( years, months, days ) with ( 'connector' = 'hudi', //指定写入的是Hudi表 'path' = 'obs://XXXXXXXXXXXXXXXXXX/', //指定Hudi表的存储路径 'table.type' = 'MERGE_ON_READ', //Hudi表类型 'hoodie.datasource.write.recordkey.field' = 'id', //主键 'write.precombine.field' = 'vin', //合并字段 'write.tasks' = '10', //flink写入并行度 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', //指定KeyGenerator,与Spark创建的Hudi表类型一致 'hoodie.datasource.write.hive_style_partitioning' = 'true', //使用hive支持的分区格式 'read.streaming.enabled' = 'true', //开启流读 'read.streaming.check-interval' = '60', //checkpoint间隔,单位为秒 'index.type'='BUCKET', //指定Hudi表索引类型为BUCKET 'hoodie.bucket.index.num.buckets'='10', //指定bucket桶数 'compaction.delta_commits' = '3', //compaction生成的commit间隔 'compaction.async.enabled' = 'false', //compaction异步执行关闭 'compaction.schedule.enabled' = 'true', //compaction同步生成计划 'clean.async.enabled' = 'false', //异步clean关闭 'hoodie.archive.automatic' = 'false', //自动archive关闭 'hoodie.clean.automatic' = 'false', //自动clean关闭 'hive_sync.enable' = 'true', //自动同步hive表 'hive_sync.mode' = 'jdbc', //同步hive表方式为jdbc 'hive_sync.jdbc_url' = '', //同步hive表的jdbc url 'hive_sync.db' = 'hudi_cars_byd', //同步hive表的database 'hive_sync.table' = 'byd_hudi_denza_1s_mor', //同步hive表的tablename 'hive_sync.metastore.uris' = 'thrift://XXXXX:9083 ', //同步hive表的metastore uri 'hive_sync.support_timestamp' = 'true', //同步hive表支持timestamp格式 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' //同步hive表的extractor类 ); Spark streaming写入Hudi表常用的参数如下(参数意义与上面flink类似,不再做注释): hoodie.table.name= hoodie.index.type=BUCKET hoodie.bucket.index.num.buckets=3 hoodie.datasource.write.precombine.field= hoodie.datasource.write.recordkey.field= hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.table.type= MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.clean.automatic=false hoodie.clean.async=false hoodie.archive.async=false hoodie.archive.automatic=false hoodie.compact.inline.max.delta.commits=50 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.table= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor 父主题: Bucket调优示例
  • 离线Compaction配置 对于MOR表的实时业务,通常设置在写入中同步生成compaction计划,因此需要额外通过DataArts或者脚本调度SparkSQL去执行已经产生的compaction计划。 执行参数 set hoodie.compact.inline = true; //打开compaction操作 set hoodie.run.compact.only.inline = true; //compaction只执行已生成的计划,不产生新计划 set hoodie.cleaner.commits.retained = 120; // 清理保留120个commit set hoodie.keep.max.commits = 140; // 归档最大保留140个commit set hoodie.keep.min.commits = 121; // 归档最小保留121个commit set hoodie.clean.async = false; // 打开异步清理 set hoodie.clean.automatic = false; // 关闭自动清理,防止compaction操作出发clean run compaction on $tablename; // 执行compaction计划 run clean on $tablename; // 执行clean操作清理冗余版本 run archivelog on $tablename; // 执行archivelog合并清理元数据文件 关于清理、归档参数的值不宜设置过大,会影响Hudi表的性能,通常建议: hoodie.cleaner.commits.retained = compaction所需要的commit数的2倍 hoodie.keep.min.commits = hoodie.cleaner.commits.retained + 1 hoodie.keep.max.commits = hoodie.keep.min.commits + 20 执行compaction后再执行clean和archive,由于clean和archivelog对资源要求较小,为避免资源浪费,使用DataArts调度的话可以compaction作为一个任务,clean、archive作为一个任务分别配置不同的资源执行来节省资源使用。 执行资源 Compaction调度的间隔应小于Compaction计划生成的间隔,例如1小时左右生成一个Compaction计划的话,执行Compaction计划的调度任务应该至少半小时调度一次。 Compaction作业配置的资源,vcore数至少要大于等于单个分区的桶数,vcore数与内存的比例应为1:4即1个vcore配4G内存。 父主题: Bucket调优示例