云服务器内容精选

  • 对有更新操作的数据流进行聚合计算时要注意数据准确性问题 在针对更新数据进行聚合需要选择合适的解决方案,否则聚合结果会是错误的。 例如: Create table t1( id int, partid int, value int ); select partid,sum(value) from t1 group by partid; 第一批数据:[1,1,10],[2,1,11],[3,2,8] 聚合结果:[1,21],[2,8] 第二批数据:[2,1,12] //对ID=2的记录进行更新。 错误结果:[1,33],[2,8] //若是无法识别是对ID=2的数据进行了更新。 聚合结果:[1,22],[2,8] //识别为更新操作可以得到正确结果。 对于如何识别是更新数据有三种方式: 通过状态后端解决 通过状态后端存储所有原始数据,新来的数据根据状态来判断是否是更新操作,进而通过Flink聚合回撤机制实现聚合结果数据的更新。 优点:可以解决聚合准确性问题,而且对用户友好,对数据没有要求。 缺点:大数据量情况下状态后端存储的数据比较多。 通过CDC格式数据解决 CDC格式数据是指更新操作记录中会同时包含更新前数据和更新后数据。通过更新前的内容来回撤掉之前的聚合结果,通过更新后的数据更新最新的计算结果。 优点:不需要有大的状态后端存储,整体计算资源压力要小于基于状态后端的方案。 缺点:需要依赖于数据格式,常见的方式通过CDC采集工具,将数据采集到Kafka,然后Flink读Kafka数据进行计算。 通过changelog数据解决 changelog与CDC格式的数据类似,只不过存储的方式不同,CDC格式数据会将更新前和更新后的数据在一行记录,而changelog数据会将更新数据拆分成两行,一行是对更新前数据的删除操作,一行是更新后的数据插入操作记录。Flink在计算的时候会将基于更新数据的聚合结果删除,在将基于更新后数据的计算结果插入。changelog可以基于Hudi表实现,基于CDC格式的数据可以转为changelog数据存储到Hudi的MOR表的log文件中,也可以基于状态后端生成Hudi的changelog数据。 优点:可以基于湖存储实现更新数据聚合一致性保证。 缺点: Hudi的MOR表中仅在log文件中存在changelog数据,如果Flink作业计算延迟导致上游数据积压,而Hudi又清理了log文件,就会导致changelog丢失。针对这种情况需要保留版本数多一点,且给Flink作业合理的资源配置避免数据积压周期超过了清理周期。 基于状态后端生成changelog也是依赖于状态后端的,状态后端通常是会配置TTL时间的,不会永久保留。这种场景下更新操作是任意更新,没有一定时间周期限制。例如更新近一个月的数据,TTL设置大于一个月即可;若更新全部数据,就需要设置TTL为永久,不适用于大表。 目前changelog的MOR表,仅支持Flink引擎进行compaction处理,不支持Spark引擎。