云服务器内容精选

  • 规则 Hudi表必须设置合理的主键。 Hudi表提供了数据更新和幂等写入能力,该能力要求Hudi表必须设置主键,主键设置不合理会导致数据重复。主键可以为单一主键也可以为复合主键,两种主键类型均要求主键不能有null值和空值,可以参考以下示例设置主键: SparkSQL: -- 通过primaryKey指定主键,如果是复合主键需要用逗号分隔。 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: --通过hoodie.datasource.write.recordkey.field指定主键。 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). FlinkSQL: --通过hoodie.datasource.write.recordkey.field指定主键。 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') Hudi表必须配置precombine字段。 在数据同步过程中不可避免会出现数据重复写入、数据乱序问题,例如:异常数据恢复、写入程序异常重启等场景。通过设置合理precombine字段值可以保证数据的准确性,老数据不会覆盖新数据,也就是幂等写入能力。该字段可用选择的类型包括:业务表中更新时间戳、数据库的提交时间戳等。precombine字段不能有null值和空值,可以参考以下示例设置precombine字段: SparkSQL: --通过preCombineField指定precombine字段。 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: --通过hoodie.datasource.write.precombine.field指定precombine字段。 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). Flink: --通过write.precombine.field指定precombine字段。 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') 流式计算采用MOR表。 流式计算为低时延的实时计算,需要高性能的流式读写能力,在Hudi表中存在的MOR和COW两种模型中,MOR表的流式读写性能相对较好,因此在流式计算场景下采用MOR表模型。关于MOR表在读写性能的对比关系如下: 对比维度 MOR表 COW表 流式写 高 低 流式读 高 低 批量写 高 低 批量读 低 高 实时入湖,表模型采用MOR表。 实时入湖一般的性能要求都在分钟内或者分钟级,结合Hudi两种表模型的对比,因此在实时入湖场景中需要选择MOR表模型。 Hudi表名以及列名采用小写字母。 多引擎读写同一张Hudi表时,为了规避引擎之间大小写的支持不同,统一采用小写字母。
  • 建议 Spark批处理场景,对写入时延要求不高的场景,采用COW表。 COW表模型中,写入数据存在写放大问题,因此写入速度较慢;但COW具有非常好的读取性能力。而且批量计算对写入时延不是很敏感,因此可以采用COW表。 Hudi表的写任务要开启Hive元数据同步功能。 SparkSQL天然与Hive集成,无需考虑元数据问题。该条建议针对的是通过Spark Datasource API或者Flin写Hudi表的场景,通过这两种方式写Hudi时需要增加向Hive同步元数据的配置项;该配置的目的是将Hudi表的元数据统一托管到Hive元数据服务中,为后续的跨引擎操作数据以及数据管理提供便利。
  • 规则 有数据持续写入的表,24小时内至少执行一次compaction。 对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题: Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。 CPU与内存比例为1:4~1:8。 Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:4~1:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。
  • 优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
  • 调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
  • Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
  • 初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; --入库前去重,如果数据没有重复 该参数无需设置。 set hoodie.datasource.write.operation = bulk_insert; --指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; --指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数。 insert into dsrTable select * from srcTabble
  • 优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
  • 实时任务接入 实时作业一般由Flink Sql或Sparkstreaming来完成,流式实时任务通常配置同步生成compaction计划,异步执行计划。 Flink SQL作业中sink端Hudi表相关配置如下: create table denza_hudi_sink ( $HUDI_SINK_SQL_REPLACEABLE$ ) PARTITIONED BY ( years, months, days ) with ( 'connector' = 'hudi', --指定写入的是Hudi表。 'path' = 'obs://XXXXXXXXXXXXXXXXXX/', --指定Hudi表的存储路径。 'table.type' = 'MERGE_ON_READ', --Hudi表类型。 'hoodie.datasource.write.recordkey.field' = 'id', --主键。 'write.precombine.field' = 'vin', --合并字段。 'write.tasks' = '10', --flink写入并行度。 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', --指定KeyGenerator,与Spark创建的Hudi表类型一致。 'hoodie.datasource.write.hive_style_partitioning' = 'true', --使用hive支持的分区格式。 'read.streaming.enabled' = 'true', --开启流读。 'read.streaming.check-interval' = '60', --checkpoint间隔,单位为秒。 'index.type'='BUCKET', --指定Hudi表索引类型为BUCKET。 'hoodie.bucket.index.num.buckets'='10', --指定bucket桶数。 'compaction.delta_commits' = '3', --compaction生成的commit间隔。 'compaction.async.enabled' = 'false', --compaction异步执行关闭。 'compaction.schedule.enabled' = 'true', --compaction同步生成计划。 'clean.async.enabled' = 'false', --异步clean关闭。 'hoodie.archive.automatic' = 'false', --自动archive关闭。 'hoodie.clean.automatic' = 'false', --自动clean关闭。 'hive_sync.enable' = 'true', --自动同步hive表。 'hive_sync.mode' = 'jdbc', --同步hive表方式为jdbc。 'hive_sync.jdbc_url' = '', --同步hive表的jdbc url。 'hive_sync.db' = 'hudi_cars_byd', --同步hive表的database。 'hive_sync.table' = 'byd_hudi_denza_1s_mor', --同步hive表的tablename。 'hive_sync.metastore.uris' = 'thrift://XXXXX:9083 ', --同步hive表的metastore uri。 'hive_sync.support_timestamp' = 'true', --同步hive表支持timestamp格式。 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' --同步hive表的extractor类。 ); Spark streaming写入Hudi表常用的参数如下(参数意义与上面flink类似,不再做注释): hoodie.table.name= hoodie.index.type=BUCKET hoodie.bucket.index.num.buckets=3 hoodie.datasource.write.precombine.field= hoodie.datasource.write.recordkey.field= hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.table.type= MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.clean.automatic=false hoodie.clean.async=false hoodie.archive.async=false hoodie.archive.automatic=false hoodie.compact.inline.max.delta.commits=50 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.table= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor 父主题: Bucket调优示例
  • 离线Compaction配置 对于MOR表的实时业务,通常设置在写入中同步生成compaction计划,因此需要额外通过DataArts或者脚本调度SparkSQL去执行已经产生的compaction计划。 执行参数 set hoodie.compact.inline = true; --打开compaction操作。 set hoodie.run.compact.only.inline = true; --compaction只执行已生成的计划,不产生新计划。 set hoodie.cleaner.commits.retained = 120; --清理保留120个commit。 set hoodie.keep.max.commits = 140; --归档最大保留140个commit。 set hoodie.keep.min.commits = 121; --归档最小保留121个commit。 set hoodie.clean.async = false; --打开异步清理。 set hoodie.clean.automatic = false; --关闭自动清理,防止compaction操作触发clean。 run compaction on $tablename; --执行compaction计划。 run clean on $tablename; --执行clean操作清理冗余版本。 run archivelog on $tablename; --执行archivelog合并清理元数据文件。 关于清理、归档参数的值不宜设置过大,会影响Hudi表的性能,通常建议: hoodie.cleaner.commits.retained = compaction所需要的commit数的2倍 hoodie.keep.min.commits = hoodie.cleaner.commits.retained + 1 hoodie.keep.max.commits = hoodie.keep.min.commits + 20 执行compaction后再执行clean和archive,由于clean和archivelog对资源要求较小,为避免资源浪费,使用DataArts调度的话可以compaction作为一个任务,clean、archive作为一个任务分别配置不同的资源执行来节省资源使用。 执行资源 Compaction调度的间隔应小于Compaction计划生成的间隔,例如1小时左右生成一个Compaction计划的话,执行Compaction计划的调度任务应该至少半小时调度一次。 Compaction作业配置的资源,vcore数至少要大于等于单个分区的桶数,vcore数与内存的比例应为1:4即1个vcore配4G内存。 父主题: Bucket调优示例
  • Spark写Hudi各种写入模式参数规范说明 类型 说明 开启参数 场景选择 特点 upsert update + insert Hudi默认写入类型,写入具有更新能力。 默认,无需参数开启。 SparkSQL: set hoodie.datasource.write.operation=upsert; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "upsert") .mode("append") .save("/tmp/tablePath") 默认选择。 优点: 支持小文件合并。 支持更新。 缺点: 写入速度中规中矩。 append 数据无更新直接写入 Spark:Spark侧没有纯append模式可使用bulk insert模式替代。 SparkSQL: set hoodie.datasource.write.operation = bulk_insert; set hoodie.datasource.write.row.writer.enable = true; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "true") .mode("append") .save("/tmp/tablePath") 追求高吞吐,无数据更新场景。 优点: 写入速度最快。 缺点: 无小文件合并能力。 无更新能力。 需要clustering合并小文件。 delete 删除操作 无需参数,直接使用delete语法即可: delete from tableName where primaryKey='id1'; SQL删除数据数据场景。 和upsert类型一样。 Insert overwrite 覆写分区 无需参数,直接使用insert overwrite语法即可: insert overwrite table tableName partition(dt ='2021-01-04') select * from srcTable; 分区级别重新。 覆写分区。 Insert overwrite table 覆写全表 无需参数,直接使用insert overwrite语法即可: insert overwrite table tableName select * from srcTable; 全部重写。 覆写全表。 Bulk_insert 批量导入 SparkSQL: set hoodie.datasource.write.operation = bulk_insert; set hoodie.datasource.write.row.writer.enable = true; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "true") .mode("append") .save("/tmp/tablePath") 建议表初始化搬迁的时候使用。 和append模式一样。
  • 编辑器使用介绍 访问Hue WebUI,请参考访问Hue WebUI界面。 在左侧导航栏单击,然后选择“Workflow”。 支持创建Workflow、计划和Bundles的操作。支持提交运行、共享、复制和导出已创建的应用。 每个Workflow可以包含一个或多个作业,形成完整的工作流,用于实现指定的业务。 创建Workflow时,可直接在Hue的编辑器设计作业,并添加到Workflow中。 每个计划可定义一个时间触发器,用于定时触发执行一个指定的Workflow。不支持多个Workflow。 每个Bundles可定义一个集合,用于触发执行多个计划,使不同Workflow批量执行。
  • 操作场景 用户需要使用图形化界面查看集群中所有作业时,可以通过Hue完成任务。 Hue提供了Oozie作业管理器功能,使用户可以通过界面图形化的方式使用Oozie。 Hue界面主要用于文件、表等数据的查看与分析,禁止通过Hue界面对操作对象进行删除等高危管理操作。如需操作,建议在确认对业务没有影响后通过各组件的相应操作方法进行处理,例如使用HDFS客户端对HDFS文件进行操作,使用Hive客户端对Hive表进行操作。
  • 访问作业浏览器 访问Hue WebUI,请参考访问Hue WebUI界面。 单击作业,进入“作业浏览器”。 默认显示当前集群的所有作业。支持查看Workflow、Coordinator和Bundles作业的运行情况。 作业浏览器显示的数字表示集群中所有作业的总数。 “作业浏览器”将显示作业以下信息: 表1 MRS 作业属性介绍 属性名 描述 名称 表示作业的名称。 用户 表示启动该作业的用户。 类型 表示作业的类型。 状态 表示作业的状态,包含“成功”、“正在运行”、“失败”。 进度 表示作业运行进度。 组 表示作业所属组。 开始 表示作业开始时间。 持续时间 表示作业运行使用的时间。 Id 表示作业的编号,由系统自动生成。 如果MRS集群安装了Spark组件,则默认会启动一个作业“Spark-JD BCS erver”,用于执行任务。
  • 管理文件或目录 在“文件浏览器”界面,勾选一个或多个目录或文件。 单击“操作”,在弹出菜单选择一个操作。 重命名:表示重新命名一个目录或文件。 移动:表示移动文件,在“移至”页面选择新的目录并单击“移动”完成移动。 复制:表示复制选中的文件或目录。 更改权限:表示修改选中目录或文件的访问权限。 可以为属主、属组和其他用户设置“读取”、“写”和“执行”权限。 “易贴”表示禁止HDFS的管理员、目录属主或文件属主以外的用户在目录中移动文件。 “递归”表示递归设置权限到子目录。 存储策略:表示设置目录或文件在HDFS中的存储策略。 摘要:表示查看选中的文件或目录的HDFS存储信息。