云服务器内容精选

  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 Upsert Kafka 始终以upsert方式工作,并且需要在DDL中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changlog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在Kafka消息的key中。 由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。 数据类型的使用,请参考Format章节。
  • 前提条件 该场景作业需要运行在 DLI 的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 1 2 3 4 5 6 7 8 9101112 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED))with ( 'connector' = 'upsert-kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'key.format' = '', 'value.format' = '');
  • 功能描述 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 作为 source,upsert-kafka 连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
  • 参数说明 表1 参数说明 参数 是否必选 默认参数 数据类型 说明 connector 是 无 String connector类型,对于upsert kafka,需配置为'upsert-kafka'。 topic 是 无 String Kafka topic名。 properties.bootstrap.servers 是 无 String Kafka brokers地址,以逗号分隔。 key.format 是 无 String 用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 key.fields-prefix 否 无 String 为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。 默认情况下,前缀为空。如果定义了自定义前缀,则表架构和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。 value.format 是 无 String 用于对 Kafka消息中 value 部分序列化和反序列化的格式。支持的格式: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 value.fields-include 是 ALL String 控制哪些字段应该出现在值中。取值范围如下: ALL:消息的value部分将包含schema的所有字段,包括定义中键的字段。 EXCEPT_KEY:记录的value部分包含schema的所有内容,定义为主键的字段除外。 properties.* 否 无 String 该选项可以传递任意的Kafka参数。 “properties.”后的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入KafkaClient。 例如:您可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。 但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。 ssl_auth_name 否 无 String DLI侧创建的Kafka_SSL类型的跨源认证名称。Kafka配置SSL时使用该配置。 注意:若仅使用SSL类型,则需要同时配置'properties.security.protocol '= 'SSL'; 若使用SASL_SSL类型,则需要同时配置'properties.security.protocol' = 'SASL_SSL'、'properties.sasl.mechanism' = 'GSSAPI或者PLAIN'、'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";' krb_auth_name 否 无 String DLI侧创建的Kerberos类型的跨源认证名称。Kafka配置SASL认证时使用该配置。 注意:如果使用SASL_PLAINTEXT类型,且使用Kerberos认证,则需要同时配置'properties.sasl.mechanism' = 'GSSAPI'和'properties.security.protocol' = 'SASL_PLAINTEXT'