云服务器内容精选

  • 语法格式 create table hudiSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'hudi', 'path' = 'obs://xx', 'table.type' = 'xx', 'hoodie.datasource.write.recordkey.field' = 'xx', 'write.precombine.field' = 'xx', 'read.streaming.enabled' = 'xx' ... );
  • 参数说明 当下游消费Hudi过慢,上游写入端会把Hudi文件归档,导致File Not Found问题。设置合理的消费参数避免File Not Found问题。 优化建议: 调大read.tasks。 如果有限流,调大限流参数。 调大上游compaction、archive、clean参数。 表1 参数名称 参数 是否必选 默认值 数据类型 参数说明 connector 是 无 String 读取表类型。需要填写'hudi' path 是 无 String 表存储的路径。如obs://xx/xx table.type 是 COPY_ON_WRITE String Hudi表类型。 MERGE_ON_READ COPY_ON_WRITE hoodie.datasource.write.recordkey.field 是 无 String 表的主键。 write.precombine.field 是 无 String 数据合并字段。 read.tasks 否 4 Integer 读hudi表task并行度。 read.streaming.enabled 是 false Boolean 设置 true 开启流式增量模式,false批量读。建议值为true read.streaming.start-commit 否 默认从最新 commit String Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间) hoodie.datasource.write.keygenerator.type 否 COMPLEX Enum 上游表主键生成类型: SIMPLE(默认值) COMPLEX TIMESTAMP CUSTOM NON_PARTITION GLOBAL_DELETE read.streaming.check-interval 否 1 Integer 流读监测上游新提交的周期(分钟),流量大时建议使用默认值,默认值:1。 read.end-commit 否 默认到最新 commit String Batch增量消费,通过参数“read.streaming.start-commit”指定起始消费位置,通过参数“read.end-commit”指定结束消费位置,为闭区间,即包含起始、结束的Commit,默认到最新Commit。 read.rate.limit 否 0 Integer 流读Hudi的限流速率,单位为每秒的条数。默认值:0,表示不限速。该值为总限速大小,每个算子的限速大小需除以读算子个数(read.tasks)。 changelog.enabled 否 false Boolean 是否写入changelog消息。CDC场景填写为 true
  • 注意事项 建议Hudi作为Source表时设置限流 Hudi表作为Source表时,为防止数据上限超过流量峰值导致作业出现异常,建议设置限流(read.rate.limit),限流上限应该为业务上线压测的峰值。 及时对Hudi表进行Compaction,防止Hudi source算子checkpoint完成时间过长 当Hudi Source算子checkpoint完成时间长时,检查该Hudi表Compaction是否正常。因为当长时间不做Compaction时list性能会变差。 流读Hudi MOR表时,建议开启log index特性提升Flink流读性能 Hudi的Mor表可以通过log index提升读写性能, Sink和Source表添加属性 'hoodie.log.index.enabled'='true' 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • Hudi简介 Apache Hudi(发音Hoodie)表示Hadoop Upserts Deletes and Incrementals。用来管理Hadoop大数据体系下存储在DFS上大型分析数据集。 Hudi不是单纯的数据格式,而是一套数据访问方法(类似 GaussDB (DWS)存储的access层),在Apache Hudi 0.9版本,大数据的Spark,Flink等组件都单独实现各自客户端。Hudi的逻辑存储如下图所示: 写入模式 COW:写时复制,适合更新少的场景。 MOR:读时复制,对于UPDATE&DELETE增量写delta log文件,分析时进行base和delta log文件合并,异步compaction合并文件。 存储格式 index:对主键进行索引,默认是file group级别的bloomfilter。 data files:base file + delta log file(主要面向对base file的update&delete)。 timeline metadata:版本log的管理。 视图 读优化视图:读取Compaction后生成的base file,未Compaction数据时效性有一定延迟(高效读取)。 实时视图:读取最新的数据,在读取时进行Base file和Delta file合并(频繁update场景)。 增量视图:类似CDC方式持续读取增量写入Hudi的数据(流批一体)。 父主题: SQL on Hudi
  • Hudi简介 Apache Hudi(发音Hoodie)表示Hadoop Upserts Deletes and Incrementals。用来管理Hadoop大数据体系下存储在DFS上大型分析数据集。 Hudi不是单纯的数据格式,而是一套数据访问方法(类似GaussDB(DWS)存储的access层),在Apache Hudi 0.9版本,大数据的Spark,Flink等组件都单独实现各自客户端。Hudi的逻辑存储如下图所示: 写入模式 COW:写时复制,适合更新少的场景。 MOR:读时复制,对于UPDATE&DELETE增量写delta log文件,分析时进行base和delta log文件合并,异步compaction合并文件。 存储格式 index:对主键进行索引,默认是file group级别的bloomfilter。 data files:base file + delta log file(主要面向对base file的update&delete)。 timeline metadata:版本log的管理。 视图 读优化视图:读取Compaction后生成的base file,未Compaction数据时效性有一定延迟(高效读取)。 实时视图:读取最新的数据,在读取时进行Base file和Delta file合并(频繁update场景)。 增量视图:类似CDC方式持续读取增量写入Hudi的数据(流批一体)。 父主题: SQL on Hudi
  • 问题 使用Spark SQL删除MOR表后重新建表写入数据不能实时同步ro、rt表,报错如下: WARN HiveSyncTool: Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set java.lang.IllegalArgumentException: Failed to get schema for table hudi_table2_ro does not exist at org.apache.hudi.hive.HoodieHiveClient.getTableSchema(HoodieHiveClient.java:183) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:286) at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:213)
  • Schema演进支持范围 Schema演进支持范围: 支持列(包括嵌套列)相关的增、删、改、位置调整等操作。 不支持对分区列做演进。 不支持对Array类型的嵌套列进行增、删、列操作。 表1 引擎支持矩阵 引擎 DDL操作Schema 变更后的Hudi表写操作支持 变更后的Hudi表读操作支持 变更后Hudi表compaction支持 SparkSQL Y Y Y Y Flink N Y Y Y HetuEngine N N Y N Hive N N Y N
  • HoodieDeltaStreamer流式写入 Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能: 支持Kafka,DFS多种数据源接入 。 支持管理检查点、回滚和恢复,保证exactly once语义。 支持自定义转换操作。 示例: 准备配置文件kafka-source.properties #hudi配置 hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=age hoodie.upsert.shuffle.parallelism=100 #hive config hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition hoodie.datasource.hive_sync.partition_fields=age hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false hoodie.datasource.hive_sync.support_timestamp=true # Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition #checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition # Kafka props # The kafka cluster we want to ingest from bootstrap.servers= xx.xx.xx.xx:xx auto.offset.reset=earliest #auto.offset.reset=latest group.id=hoodie-delta-streamer offset.rang.limit=10000 指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令: spark-submit --master yarn --jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径 --driver-memory 1g --executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m --driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal --props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。 --target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径 --table-type MERGE_ON_READ // 指定要写入的hudi表类型 --target-table hudimor_deltastreamer_partition // 指定hudi表名 --source-ordering-field name // 指定hudi表预合并列 --source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源为JsonKafkaSource, 该参数根据不同数据源指定不同的source类 --schemaprovider-class com.huaweixxx.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema --transformer-class com.huaweixxx.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制 --enable-hive-sync // 开启hive同步,同步hudi表到hive --continuous // 指定流处理模式为连续模式
  • 示例 alter table h0 add columns(ext0 string); alter table h0 add columns(new_col int not null comment 'add new column' after col1); alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);
  • 命令格式 set hoodie.archive.file.cleaner.policy = KEEP_ARCHIVED_FILES_BY_SIZE; set hoodie.archive.file.cleaner.size.retained = 5368709120; run cleanarchive on tableIdentifier/tablelocation; set hoodie.archive.file.cleaner.policy = KEEP_ARCHIVED_FILES_BY_DAYS; set hoodie.archive.file.cleaner.days.retained = 30; run cleanarchive on tableIdentifier/tablelocation;
  • 参数描述 表1 参数描述 参数 描述 tableIdentifier Hudi表的名称。 tablelocation Hudi表的存储路径。 hoodie.archive.file.cleaner.policy 清理归档文件的策略:目前仅支持KEEP_ARCHIVED_FILES_BY_SIZE和KEEP_ARCHIVED_FILES_BY_DAYS两种策略,默认策略为KEEP_ARCHIVED_FILES_BY_DAYS。 KEEP_ARCHIVED_FILES_BY_SIZE策略可以设置归档文件占用的存储空间大小 KEEP_ARCHIVED_FILES_BY_DAYS策略可以清理超过某个时间点之外的归档文件 hoodie.archive.file.cleaner.size.retained 当清理策略为KEEP_ARCHIVED_FILES_BY_SIZE时,该参数可以设置保留多少字节大小的归档文件,默认值5368709120字节(5G)。 hoodie.archive.file.cleaner.days.retained 当清理策略为KEEP_ARCHIVED_FILES_BY_DAYS时,该参数可以设置保留多少天以内的归档文件,默认值30(天)。
  • 读取Hudi数据概述 Hudi的读操作,作用于Hudi的三种视图之上,可以根据需求差异选择合适的视图进行查询。 Hudi支持多种查询引擎Spark、Hive、HetuEngine,具体支持矩阵见表1和表2。 表1 cow表 查询引擎 实时视图/读优化视图 增量视图 Hive Y Y Spark(SparkSQL) Y Y Spark(SparkDataSource API) Y Y HetuEngine Y N 表2 mor表 查询引擎 实时视图 增量视图 读优化视图 Hive Y Y Y Spark(SparkSQL) Y Y Y Spark(SparkDataSource API) Y Y Y HetuEngine Y N Y 当前Hudi使用Spark datasource接口读取时,不支持分区推断能力。比如bootstrap表使用datasource接口查询时,可能出现分区字段不显示,或者显示为null的情况。 增量视图,需设置set hoodie.hudicow.consume.mode = INCREMENTAL;,但该参数仅限于增量视图查询,不能用于Hudi表的其他类型查询,和其他表的查询。 恢复配置可设置set hoodie.hudicow.consume.mode = SNAPSHOT;或任意值。 父主题: Hudi读操作
  • 参数描述 表1 参数描述 参数 描述 table_name 需要清理无效数据文件的Hudi表的表名,必选。 op_type 命令运行模式,可选,默认值为dry_run,取值:dry_run、repair、undo、query。 dry_run:显示需要清理的无效数据文件。 repair:显示并清理无效的数据文件。 undo:恢复已清理的数据文件。 query:显示已执行清零操作的备份目录。 backup_path 运行模式为undo时有效,需要恢复数据文件的备份目录,必选。 start_time 运行模式为dry_run、repair时有效,产生无效数据文件的开始时间,可选,默认不限制开始时间。 end_time 运行模式为dry_run、repair时有效,产生无效数据文件的结束时间,可选,默认不限制结束时间。
  • 注意事项 写入模式:Hudi对于设置了主键的表支持三种写入模式,用户可以设置参数hoodie.sql.insert.mode来指定Insert模式,默认为upsert。 strict模式,Insert语句将保留COW表的主键唯一性约束,不允许重复记录。如果在插入过程中已经存在记录,则会为COW表执行HoodieDuplicateKeyException;对于MOR表,该模式与upsert模式行为一致。 non-strict模式,对主键表采用insert处理。 upsert模式,对于主键表的重复值进行更新操作。 在执行spark-sql时,用户可以设置“hoodie.sql.bulk.insert.enable = true”和“hoodie.sql.insert.mode = non-strict”来开启bulk insert作为Insert语句的写入方式。 也可以通过直接设置hoodie.datasource.write.operation的方式控制insert语句的写入方式,包括bulk_insert、insert、upsert。使用这种方式控制hoodie写入,需要注意执行完SQL后,必须执行reset hoodie.datasource.write.operation;重置Hudi的写入方式,否则该参数会影响其他SQL的执行。
  • 示例 insert into h0 select 1, 'a1', 20; -- insert static partition insert into h_p0 partition(dt = '2021-01-02') select 1, 'a1'; -- insert dynamic partition insert into h_p0 select 1, 'a1', dt; -- insert dynamic partition insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh; -- insert overwrite table insert overwrite table h0 select 1, 'a1', 20; -- insert overwrite table with static partition insert overwrite h_p0 partition(dt = '2021-01-02') select 1, 'a1'; -- insert overwrite table with dynamic partition insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;