MAPREDUCE服务 MRS-structured streaming功能与可靠性介绍:Structured Streaming可靠性说明

时间:2024-10-18 17:22:19

Structured Streaming可靠性说明

Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。

  1. 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。

    从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下:

    1. 不允许source的个数或者类型发生变化。
    2. source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如:
      • 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
      • 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic")
    3. sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如:
      • File sink允许变更为kafka sink,kafka中只处理新数据。
      • kafka sink不允许变更为file sink。
      • kafka sink允许变更为foreach sink,反之亦然。
    4. sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如:
      • 不允许file sink的输出路径发生变更。
      • 允许Kafka sink的输出topic发生变更。
      • 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。
    5. Projection、filter和map-like操作变更,局部场景下能够支持,例如:
      • 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...)
      • Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream
      • Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。
    6. 状态操作的变更,在部分场景下会导致状态恢复失败:
      • Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。
      • Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。
      • Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。
      • 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。
  1. Source的容错性支持列表

    Sources

    支持的Options

    容错支持

    说明

    File source

    path:必填,文件路径

    maxFilesPerTrigger:每次trigger最大文件数(默认无限大)

    latestFirst:是否有限处理新文件(默认值: false)

    fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false)

    支持

    支持通配符路径,但不支持以逗号分隔的多个路径。

    文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。

    Socket Source

    host:连接的节点ip,必填

    port:连接的端口,必填

    不支持

    -

    Rate Source

    rowsPerSecond:每秒产生的行数,默认值1

    rampUpTime:在达到rowsPerSecond速度之前的上升时间

    numPartitions:生成数据行的并行度

    支持

    -

    Kafka Source

    参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html

    支持

    -

  2. Sink的容错性支持列表

    Sinks

    支持的output模式

    支持Options

    容错性

    说明

    File Sink

    Append

    Path:必须指定

    指定的文件格式,参见DataFrameWriter中的相关接口

    exactly-once

    支持写入分区表,按时间分区用处较大

    Kafka Sink

    Append, Update, Complete

    参见:https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html

    at-least-once

    参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html

    Foreach Sink

    Append, Update, Complete

    None

    依赖于ForeachWriter实现

    参见https://spark.apache.org/docs/3.1.1/structured-streaming-programming-guide.html#using-foreach

    ForeachBatch Sink

    Append, Update, Complete

    None

    依赖于算子实现

    参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

    Console Sink

    Append, Update, Complete

    numRows:每轮打印的行数,默认20

    truncate:输出太长时是否清空,默认true

    不支持容错

    -

    Memory Sink

    Append, Complete

    None

    不支持容错,在complete模式下,重启query会重建整个表

    -

support.huaweicloud.com/devg-lts-mrs/mrs_07_200144.html