数据湖探索 DLI-Upsert Kafka:参数说明

时间:2024-12-27 10:34:32

参数说明

表2 参数说明

参数

是否必选

默认参数

数据类型

说明

connector

String

connector类型,对于upsert kafka连接器,需配置为'upsert-kafka'。

topic

String

Kafka topic名。

properties.bootstrap.servers

String

Kafka brokers地址,以逗号分隔。

key.format

String

用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下:

  • csv
  • json
  • avro

请参考Format页面以获取更多详细信息和格式参数。

key.fields-prefix

String

为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。

默认情况下,前缀为空。如果定义了自定义前缀,则表架构和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。

value.format

String

用于对 Kafka消息中 value 部分序列化和反序列化的格式。支持的格式:

  • csv
  • json
  • avro

请参考Format页面以获取更多详细信息和格式参数。

value.fields-include

ALL

String

控制哪些字段应该出现在值中。取值范围如下:

  • ALL:消息的value部分将包含schema的所有字段,包括定义中键的字段。
  • EXCEPT_KEY:记录的value部分包含schema的所有内容,定义为主键的字段除外。

properties.*

String

该选项可以传递任意的Kafka参数。

“properties.”后的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入KafkaClient。

例如:您可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。

但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。

sink.parallelism

Integer

定义upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游连接算子的并行度保持一致。

sink.buffer-flush.max-rows

0

Integer

缓存刷新前,最多能缓存的记录条数。

当sink收到很多同key上的更新时,缓存将保留同 key 的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。可以通过设置为'0'来禁用它。

默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。

sink.buffer-flush.interval

0

Duration

缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如'sink.buffer-flush.interval'='10 ms'。

默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。

support.huaweicloud.com/sqlref-flink-dli/dli_08_15065.html