云服务器内容精选

  • 建议 Spark批处理场景,对写入时延要求不高的场景,采用COW表。 COW表模型中,写入数据存在写放大问题,因此写入速度较慢;但COW具有非常好的读取性能力。而且批量计算对写入时延不是很敏感,因此可以采用COW表。 Hudi表的写任务要开启Hive元数据同步功能。 SparkSQL天然与Hive集成,无需考虑元数据问题。该条建议针对的是通过Spark Datasource API或者Flin写Hudi表的场景,通过这两种方式写Hudi时需要增加向Hive同步元数据的配置项;该配置的目的是将Hudi表的元数据统一托管到Hive元数据服务中,为后续的跨引擎操作数据以及数据管理提供便利。
  • 规则 Hudi表必须设置合理的主键。 Hudi表提供了数据更新和幂等写入能力,该能力要求Hudi表必须设置主键,主键设置不合理会导致数据重复。主键可以为单一主键也可以为复合主键,两种主键类型均要求主键不能有null值和空值,可以参考以下示例设置主键: SparkSQL: -- 通过primaryKey指定主键,如果是复合主键需要用逗号分隔。 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: --通过hoodie.datasource.write.recordkey.field指定主键。 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). FlinkSQL: --通过hoodie.datasource.write.recordkey.field指定主键。 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') Hudi表必须配置precombine字段。 在数据同步过程中不可避免会出现数据重复写入、数据乱序问题,例如:异常数据恢复、写入程序异常重启等场景。通过设置合理precombine字段值可以保证数据的准确性,老数据不会覆盖新数据,也就是幂等写入能力。该字段可用选择的类型包括:业务表中更新时间戳、数据库的提交时间戳等。precombine字段不能有null值和空值,可以参考以下示例设置precombine字段: SparkSQL: --通过preCombineField指定precombine字段。 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: --通过hoodie.datasource.write.precombine.field指定precombine字段。 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). Flink: --通过write.precombine.field指定precombine字段。 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') 流式计算采用MOR表。 流式计算为低时延的实时计算,需要高性能的流式读写能力,在Hudi表中存在的MOR和COW两种模型中,MOR表的流式读写性能相对较好,因此在流式计算场景下采用MOR表模型。关于MOR表在读写性能的对比关系如下: 对比维度 MOR表 COW表 流式写 高 低 流式读 高 低 批量写 高 低 批量读 低 高 实时入湖,表模型采用MOR表。 实时入湖一般的性能要求都在分钟内或者分钟级,结合Hudi两种表模型的对比,因此在实时入湖场景中需要选择MOR表模型。 Hudi表名以及列名采用小写字母。 多引擎读写同一张Hudi表时,为了规避引擎之间大小写的支持不同,统一采用小写字母。
  • 建议 基于Flink的流式写入的表,在数据量超过2亿条记录,采用Bucket索引,2亿以内可以采用Flink状态索引。 参照Flink状态索引的特点,Hudi表超过一定数据量后,Flink作业状态后端压力很大,需要优化状态后端参数才能维持性能;同时由于Flink冷启动的时候需要遍历全表数据,大数据量也会导致Flink作业启动缓慢。因此基于简化使用的角度,针对大数据量的表,可以通过采用Bucket索引来避免状态后端的复杂调优。 如果Bucket索引+分区表的模式无法平衡Bueckt桶过大的问题,还是可以继续采用Flink状态索引,按照规范去优化对应的配置参数即可。 基于Bucket索引的表,按照单个Bucket 2GB数据量进行设计。 为了规避单个Bucket过大,建议单个Bucket的数据量不要超过2GB(该2GB是指数据内容大小,不是指数据行数也不是parquet的数据文件大小),目的是将对应的桶的Parquet文件大小控制在256MB范围内(平衡读写内存消耗和HDFS存储有效利用),因此可以看出2GB的这个限制只是一个经验值,因为不同的业务数据经过列存压缩后大小是不一样的。 为什么建议是2GB? 2GB的数据存储成列存Parquet文件后,大概的数据文件大小是150MB ~ 256MB左右。不同业务数据会有出入。而HDFS单个数据块一般会是128MB,这样可以有效的利用存储空间。 数据读写占用的内存空间都是原始数据大小(包括空值也是会占用内存的),2GB在大数据计算过程中,处于单task读写可接受范围之内。 如果是单个Bucket的数据量超过了该值范围,可能会有什么影响? 读写任务可能会出现OOM的问题,解决方法就是提升单个task的内存占比。 读写性能下降,因为单个task的处理的数据量变大,导致处理耗时变大。
  • 规则 禁止修改表索引类型。 Hudi表的索引会决定数据存储方式,随意修改索引类型会导致表中已有的存量数据与新增数据之间出现数据重复和数据准确性问题。常见的索引类型如下: 布隆索引:Spark引擎独有索引,采用bloomfiter机制,将布隆索引内容写入到Parquet文件的footer中。 Bucket索引:在写入数据过程中,通过主键进行Hash计算,将数据进行分桶写入;该索引写入速度最快,但是需要合理配置分桶数目;Flink、Spark均支持该索引写入。 状态索引:Flink引擎独有索引,是将行记录的存储位置记录到状态后端的一种索引形式,在作业冷启动过程中会遍历所有数据存储文件生成索引信息。 用Flink状态索引,Flink写入后,不支持Spark继续写入。 Flink在写Hudi的MOR表只会生成log文件,后续通过compaction操作,将log文件转为parquet文件。Spark在更新Hudi表时严重依赖parquet文件是否存在,如果当前Hudi表写的是log文件,采用Spark写入就会导致重复数据的产生。在批量初始化阶段 ,先采用Spark批量写入Hudi表,在用Flink基于Flink状态索引写入不会有问题,原因是Flink冷启动的时候会遍历所有的数据文件生成状态索引。 实时入湖场景中,Spark引擎采用Bucket索引,Flink引擎可以用Bucket索引或者状态索引。 实时入湖都是需要分钟内或者分钟级的高性能入湖,索引的选择会影响到写Hudi表的性能。在性能方面各个索引的区别如下: Bucket索引 优点:写入过程中对主键进行hash分桶写入,性能比较高,不受表的数据量限制。Flink和Spark引擎都支持,Flink和Spark引擎可以实现交叉混写同一张表。 缺点:Bucket个数不能动态调整,数据量波动和整表数据量持续上涨会导致单个Bucket数据量过大出现大数据文件。需要结合分区表来进行平衡改善。 Flink状态索引 优点:主键的索引信息存在状态后端,数据更新只需要点查状态后端即可,速度较快;同时生成的数据文件大小稳定,不会产生小文件、超大文件问题。 缺点:该索引为Flink特有索引。在表的总数据行数达到数亿级别,需要优化状态后端参数来保持写入的性能。使用该索引无法支持Flink和Spark交叉混写。 对于数据总量持续上涨的表,采用Bucket索引时,须使用时间分区,分区键采用数据创建时间。 参照Flink状态索引的特点,Hudi表超过一定数据量后,Flink作业状态后端压力很大,需要优化状态后端参数才能维持性能;同时由于Flink冷启动的时候需要遍历全表数据,大数据量也会导致Flink作业启动缓慢。因此基于简化使用的角度,针对大数据量的表,可以通过采用Bucket索引来避免状态后端的复杂调优。 如果Bucket索引+分区表的模式无法平衡Bueckt桶过大的问题,还是可以继续采用Flink状态索引,按照规范去优化对应的配置参数即可。