MAPREDUCE服务 MRS-structured streaming功能与可靠性介绍:Structured Streaming可靠性说明
Structured Streaming可靠性说明
Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。
- 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。
从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下:
- 不允许source的个数或者类型发生变化。
- source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如:
- 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
- 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic")
- sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如:
- File sink允许变更为kafka sink,kafka中只处理新数据。
- kafka sink不允许变更为file sink。
- kafka sink允许变更为foreach sink,反之亦然。
- sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如:
- 不允许file sink的输出路径发生变更。
- 允许Kafka sink的输出topic发生变更。
- 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。
- Projection、filter和map-like操作变更,局部场景下能够支持,例如:
- 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...)
- Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream
- Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。
- 状态操作的变更,在部分场景下会导致状态恢复失败:
- Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。
- Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。
- Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。
- 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。
- Source的容错性支持列表
Sources
支持的Options
容错支持
说明
File source
path:必填,文件路径
maxFilesPerTrigger:每次trigger最大文件数(默认无限大)
latestFirst:是否有限处理新文件(默认值: false)
fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false)
支持
支持通配符路径,但不支持以逗号分隔的多个路径。
文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。
Socket Source
host:连接的节点ip,必填
port:连接的端口,必填
不支持
-
Rate Source
rowsPerSecond:每秒产生的行数,默认值1
rampUpTime:在达到rowsPerSecond速度之前的上升时间
numPartitions:生成数据行的并行度
支持
-
Kafka Source
参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html
支持
-
- Sink的容错性支持列表
Sinks
支持的output模式
支持Options
容错性
说明
File Sink
Append
Path:必须指定
指定的文件格式,参见DataFrameWriter中的相关接口
exactly-once
支持写入分区表,按时间分区用处较大
Kafka Sink
Append, Update, Complete
参见:https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html
at-least-once
参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html
Foreach Sink
Append, Update, Complete
None
依赖于ForeachWriter实现
参见https://spark.apache.org/docs/3.1.1/structured-streaming-programming-guide.html#using-foreach
ForeachBatch Sink
Append, Update, Complete
None
依赖于算子实现
Console Sink
Append, Update, Complete
numRows:每轮打印的行数,默认20
truncate:输出太长时是否清空,默认true
不支持容错
-
Memory Sink
Append, Complete
None
不支持容错,在complete模式下,重启query会重建整个表
-
- 什么是Spark_如何使用Spark_Spark的功能是什么
- 函数工作流FunctionGraph支持毫秒级响应文件处理_函数工作流_华为云FunctionGraph-华为云
- RabbitMQ如何保证消息的可靠性_分布式消息系统_分布式消息RabbitMQ-华为云
- 分布式消息系统Kafka_分布式消息系统_分布式消息kafka可以解决什么问题-华为云
- MapReduce服务_什么是Flume_如何使用Flume
- 华为云数据库 RDS for PostgreSQL 实例规格介绍
- CDN加速服务应用场景和案例效果对比
- CDN加速服务应用场景和案例效果对比
- MapReduce服务_什么是MapReduce服务_什么是HBase
- 函数流管理_编排无服务器_函数工作流 FunctionGraph-华为云