云服务器内容精选

  • 配置描述 提供两种不同的数据汇聚功能配置选项,两者在Spark JD BCS erver服务端的tunning选项中进行设置,设置完后需要重启JDB CS erver。 表1 参数说明 参数 说明 默认值 spark.sql.bigdata.thriftServer.useHdfsCollect 是否将结果数据保存到HDFS中而不是内存中。 优点:由于查询结果保存在hdfs端,因此基本不会造成JDBCServer的OOM。 缺点:速度慢。 true:保存至HDFS中 false:不使用该功能 须知: spark.sql.bigdata.thriftServer.useHdfsCollect参数设置为true时,将结果数据保存到HDFS中,但JobHistory原生页面上Job的描述信息无法正常关联到对应的SQL语句,同时spark-beeline命令行中回显的Execution ID为null,为解决JDBCServer OOM问题,同时显示信息正确,建议选择 spark.sql.userlocalFileCollect参数进行配置。 false spark.sql.uselocalFileCollect 是否将结果数据保存在本地磁盘中而不是内存里面。 优点:结果数据小数据量情况下和原生内存的方式相比性能损失可以忽略,大数据情况下(亿级数据)性能远比使用hdfs,以及原生内存方式好。 缺点:需要调优。大数据情况下建议JDBCServer driver端内存10G,executor端每个核心分配3G内存。 true:使用该功能 false: 不使用该功能 false spark.sql.collect.Hive 该参数在spark.sql.uselocalFileCollect开启的情况下生效。直接序列化的方式,还是间接序列化的方式保存结果数据到磁盘。 优点:针对分区数特别多的表查询结果汇聚性能优于直接使用结果数据保证在磁盘的方式。 缺点:和spark.sql.uselocalFileCollect开启时候的缺点一样。 true:使用该功能 false:不使用该功能 false spark.sql.collect.serialize 该参数在spark.sql.uselocalFileCollect, spark.sql.collect.Hive同时开启的情况下生效。 作用是进一步提升性能 java:采用java序列化方式收集数据。 kryo:采用kryo序列化方式收集数据,性能要比采用java好。 java 参数spark.sql.bigdata.thriftServer.useHdfsCollect和spark.sql.uselocalFileCollect不能同时设置为true。
  • 配置描述 提供两种不同的数据汇聚功能配置选项,两者在Spark JDBCServer服务端的tunning选项中进行设置,设置完后需要重启JDBCServer。 表1 参数说明 参数 说明 默认值 spark.sql.bigdata.thriftServer.useHdfsCollect 是否将结果数据保存到HDFS中而不是内存中。 优点:由于查询结果保存在hdfs端,因此基本不会造成JDBCServer的OOM。 缺点:速度慢。 true:保存至HDFS中。 false:不使用该功能。 须知: spark.sql.bigdata.thriftServer.useHdfsCollect参数设置为true时,将结果数据保存到HDFS中,但JobHistory原生页面上Job的描述信息无法正常关联到对应的SQL语句,同时spark-beeline命令行中回显的Execution ID为null,为解决JDBCServer OOM问题,同时显示信息正确,建议选择spark.sql.userlocalFileCollect参数进行配置。 false spark.sql.uselocalFileCollect 是否将结果数据保存在本地磁盘中而不是内存里面。 优点:结果数据小数据量情况下和原生内存的方式相比性能损失可以忽略,大数据情况下(亿级数据)性能远比使用HDFS,以及原生内存方式好。 缺点:需要调优。大数据情况下建议JDBCServer driver端内存10G,executor端每个核心分配3G内存。 true:使用该功能。 false: 不使用该功能。 false spark.sql.collect.Hive 该参数在spark.sql.uselocalFileCollect开启的情况下生效。直接序列化的方式,还是间接序列化的方式保存结果数据到磁盘。 优点:针对分区数特别多的表查询结果汇聚性能优于直接使用结果数据保证在磁盘的方式。 缺点:和spark.sql.uselocalFileCollect开启时候的缺点一样。 true:使用该功能。 false:不使用该功能。 false spark.sql.collect.serialize 该参数在spark.sql.uselocalFileCollect, spark.sql.collect.Hive同时开启的情况下生效。 作用是进一步提升性能 java:采用java序列化方式收集数据。 kryo:采用kryo序列化方式收集数据,性能要比采用java好。 java 参数spark.sql.bigdata.thriftServer.useHdfsCollect和spark.sql.uselocalFileCollect不能同时设置为true。
  • 问题排查步骤 登录ClickHouse客户端,需要排查是否存在异常的Merge。 select database, table, elapsed, progress, merge_type from system.merges; 业务上建议insert频率不要太快,不要小批量数据的插入,适当增大每次插入的时间间隔。 数据表分区分配不合理,导致产生太多的区分,需要重新划分分区。 如果没有触发Merge,或者Merge较慢,需要调整参数加快Merge。 加速Merge,需要调整如下参数,请参考加速Merge操作: 配置项 参考值 max_threads CPU核数*2 background_pool_size CPU核数 merge_max_block_size 8192的整数倍,根据CPU内存资源大小调整 cleanup_delay_period 适当小于默认值 30
  • 建议 Spark批处理场景,对写入时延要求不高的场景,采用COW表。 COW表模型中,写入数据存在写放大问题,因此写入速度较慢;但COW具有非常好的读取性能力。而且批量计算对写入时延不是很敏感,因此可以采用COW表。 Hudi表的写任务要开启Hive元数据同步功能。 SparkSQL天然与Hive集成,无需考虑元数据问题。该条建议针对的是通过Spark Datasource API或者Flin写Hudi表的场景,通过这两种方式写Hudi时需要增加向Hive同步元数据的配置项;该配置的目的是将Hudi表的元数据统一托管到Hive元数据服务中,为后续的跨引擎操作数据以及数据管理提供便利。
  • 规则 Hudi表必须设置合理的主键。 Hudi表提供了数据更新和幂等写入能力,该能力要求Hudi表必须设置主键,主键设置不合理会导致数据重复。主键可以为单一主键也可以为复合主键,两种主键类型均要求主键不能有null值和空值,可以参考以下示例设置主键: SparkSQL: -- 通过primaryKey指定主键,如果是复合主键需要用逗号分隔。 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: --通过hoodie.datasource.write.recordkey.field指定主键。 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). FlinkSQL: --通过hoodie.datasource.write.recordkey.field指定主键。 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') Hudi表必须配置precombine字段。 在数据同步过程中不可避免会出现数据重复写入、数据乱序问题,例如:异常数据恢复、写入程序异常重启等场景。通过设置合理precombine字段值可以保证数据的准确性,老数据不会覆盖新数据,也就是幂等写入能力。该字段可用选择的类型包括:业务表中更新时间戳、数据库的提交时间戳等。precombine字段不能有null值和空值,可以参考以下示例设置precombine字段: SparkSQL: --通过preCombineField指定precombine字段。 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: --通过hoodie.datasource.write.precombine.field指定precombine字段。 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). Flink: --通过write.precombine.field指定precombine字段。 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') 流式计算采用MOR表。 流式计算为低时延的实时计算,需要高性能的流式读写能力,在Hudi表中存在的MOR和COW两种模型中,MOR表的流式读写性能相对较好,因此在流式计算场景下采用MOR表模型。关于MOR表在读写性能的对比关系如下: 对比维度 MOR表 COW表 流式写 高 低 流式读 高 低 批量写 高 低 批量读 低 高 实时入湖,表模型采用MOR表。 实时入湖一般的性能要求都在分钟内或者分钟级,结合Hudi两种表模型的对比,因此在实时入湖场景中需要选择MOR表模型。 Hudi表名以及列名采用小写字母。 多引擎读写同一张Hudi表时,为了规避引擎之间大小写的支持不同,统一采用小写字母。
  • 规则 有数据持续写入的表,24小时内至少执行一次compaction。 对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题: Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。 CPU与内存比例为1:4~1:8。 Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:4~1:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。
  • 优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
  • 优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
  • 初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; --入库前去重,如果数据没有重复 该参数无需设置。 set hoodie.datasource.write.operation = bulk_insert; --指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; --指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数。 insert into dsrTable select * from srcTabble
  • 调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
  • Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
  • 实时任务接入 实时作业一般由Flink Sql或Sparkstreaming来完成,流式实时任务通常配置同步生成compaction计划,异步执行计划。 Flink SQL作业中sink端Hudi表相关配置如下: create table denza_hudi_sink ( $HUDI_SINK_SQL_REPLACEABLE$ ) PARTITIONED BY ( years, months, days ) with ( 'connector' = 'hudi', --指定写入的是Hudi表。 'path' = 'obs://XXXXXXXXXXXXXXXXXX/', --指定Hudi表的存储路径。 'table.type' = 'MERGE_ON_READ', --Hudi表类型。 'hoodie.datasource.write.recordkey.field' = 'id', --主键。 'write.precombine.field' = 'vin', --合并字段。 'write.tasks' = '10', --flink写入并行度。 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', --指定KeyGenerator,与Spark创建的Hudi表类型一致。 'hoodie.datasource.write.hive_style_partitioning' = 'true', --使用hive支持的分区格式。 'read.streaming.enabled' = 'true', --开启流读。 'read.streaming.check-interval' = '60', --checkpoint间隔,单位为秒。 'index.type'='BUCKET', --指定Hudi表索引类型为BUCKET。 'hoodie.bucket.index.num.buckets'='10', --指定bucket桶数。 'compaction.delta_commits' = '3', --compaction生成的commit间隔。 'compaction.async.enabled' = 'false', --compaction异步执行关闭。 'compaction.schedule.enabled' = 'true', --compaction同步生成计划。 'clean.async.enabled' = 'false', --异步clean关闭。 'hoodie.archive.automatic' = 'false', --自动archive关闭。 'hoodie.clean.automatic' = 'false', --自动clean关闭。 'hive_sync.enable' = 'true', --自动同步hive表。 'hive_sync.mode' = 'jdbc', --同步hive表方式为jdbc。 'hive_sync.jdbc_url' = '', --同步hive表的jdbc url。 'hive_sync.db' = 'hudi_cars_byd', --同步hive表的database。 'hive_sync.table' = 'byd_hudi_denza_1s_mor', --同步hive表的tablename。 'hive_sync.metastore.uris' = 'thrift://XXXXX:9083 ', --同步hive表的metastore uri。 'hive_sync.support_timestamp' = 'true', --同步hive表支持timestamp格式。 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' --同步hive表的extractor类。 ); Spark streaming写入Hudi表常用的参数如下(参数意义与上面flink类似,不再做注释): hoodie.table.name= hoodie.index.type=BUCKET hoodie.bucket.index.num.buckets=3 hoodie.datasource.write.precombine.field= hoodie.datasource.write.recordkey.field= hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.table.type= MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.clean.automatic=false hoodie.clean.async=false hoodie.archive.async=false hoodie.archive.automatic=false hoodie.compact.inline.max.delta.commits=50 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.table= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor 父主题: Bucket调优示例
  • 离线Compaction配置 对于MOR表的实时业务,通常设置在写入中同步生成compaction计划,因此需要额外通过DataArts或者脚本调度SparkSQL去执行已经产生的compaction计划。 执行参数 set hoodie.compact.inline = true; --打开compaction操作。 set hoodie.run.compact.only.inline = true; --compaction只执行已生成的计划,不产生新计划。 set hoodie.cleaner.commits.retained = 120; --清理保留120个commit。 set hoodie.keep.max.commits = 140; --归档最大保留140个commit。 set hoodie.keep.min.commits = 121; --归档最小保留121个commit。 set hoodie.clean.async = false; --打开异步清理。 set hoodie.clean.automatic = false; --关闭自动清理,防止compaction操作触发clean。 run compaction on $tablename; --执行compaction计划。 run clean on $tablename; --执行clean操作清理冗余版本。 run archivelog on $tablename; --执行archivelog合并清理元数据文件。 关于清理、归档参数的值不宜设置过大,会影响Hudi表的性能,通常建议: hoodie.cleaner.commits.retained = compaction所需要的commit数的2倍 hoodie.keep.min.commits = hoodie.cleaner.commits.retained + 1 hoodie.keep.max.commits = hoodie.keep.min.commits + 20 执行compaction后再执行clean和archive,由于clean和archivelog对资源要求较小,为避免资源浪费,使用DataArts调度的话可以compaction作为一个任务,clean、archive作为一个任务分别配置不同的资源执行来节省资源使用。 执行资源 Compaction调度的间隔应小于Compaction计划生成的间隔,例如1小时左右生成一个Compaction计划的话,执行Compaction计划的调度任务应该至少半小时调度一次。 Compaction作业配置的资源,vcore数至少要大于等于单个分区的桶数,vcore数与内存的比例应为1:4即1个vcore配4G内存。 父主题: Bucket调优示例
  • Spark写Hudi各种写入模式参数规范说明 类型 说明 开启参数 场景选择 特点 upsert update + insert Hudi默认写入类型,写入具有更新能力。 默认,无需参数开启。 SparkSQL: set hoodie.datasource.write.operation=upsert; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "upsert") .mode("append") .save("/tmp/tablePath") 默认选择。 优点: 支持小文件合并。 支持更新。 缺点: 写入速度中规中矩。 append 数据无更新直接写入 Spark:Spark侧没有纯append模式可使用bulk insert模式替代。 SparkSQL: set hoodie.datasource.write.operation = bulk_insert; set hoodie.datasource.write.row.writer.enable = true; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "true") .mode("append") .save("/tmp/tablePath") 追求高吞吐,无数据更新场景。 优点: 写入速度最快。 缺点: 无小文件合并能力。 无更新能力。 需要clustering合并小文件。 delete 删除操作 无需参数,直接使用delete语法即可: delete from tableName where primaryKey='id1'; SQL删除数据数据场景。 和upsert类型一样。 Insert overwrite 覆写分区 无需参数,直接使用insert overwrite语法即可: insert overwrite table tableName partition(dt ='2021-01-04') select * from srcTable; 分区级别重新。 覆写分区。 Insert overwrite table 覆写全表 无需参数,直接使用insert overwrite语法即可: insert overwrite table tableName select * from srcTable; 全部重写。 覆写全表。 Bulk_insert 批量导入 SparkSQL: set hoodie.datasource.write.operation = bulk_insert; set hoodie.datasource.write.row.writer.enable = true; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "true") .mode("append") .save("/tmp/tablePath") 建议表初始化搬迁的时候使用。 和append模式一样。
  • 编辑器使用介绍 访问Hue WebUI,请参考访问Hue WebUI界面。 在左侧导航栏单击,然后选择“Workflow”。 支持创建Workflow、计划和Bundles的操作。支持提交运行、共享、复制和导出已创建的应用。 每个Workflow可以包含一个或多个作业,形成完整的工作流,用于实现指定的业务。 创建Workflow时,可直接在Hue的编辑器设计作业,并添加到Workflow中。 每个计划可定义一个时间触发器,用于定时触发执行一个指定的Workflow。不支持多个Workflow。 每个Bundles可定义一个集合,用于触发执行多个计划,使不同Workflow批量执行。