检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
Flink Opensource SQL如何解析复杂嵌套 JSON? kafka message { "id": 1234567890, "name": "swq", "date": "1997-04-25", "obj": { "time1": "12:12:12"
Kafka结果表 功能描述 DLI将Flink作业的输出数据输出到Kafka中。 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 前提条件 Kafka是线下集
'SASL_PLAINTEXT' 示例(适用于Kafka集群未开启SASL_SSL场景) 该示例是从Kafka的一个topic中读取数据,并使用Kafka结果表将数据写入到kafka的另一个topic中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
配置源端MRS Kafka的数据源连接,具体参数配置如下。 表1 MRS Kafka数据源配置 参数 值 名称 自定义MRS Kafka数据源名称。例如当前配置为“source_kafka”。 Manager IP 单击输入框旁边的“选择”按钮,选择当前MRS Kafka集群即可自动关联出来Manager
String Kafka读取数据的启动位点。 取值如下: earliest-offset:从Kafka最早分区开始读取。 latest-offset:从Kafka最新位点开始读取。 group-offsets(默认值):根据Group读取。 timestamp:从Kafka指定时间点
示例2:将json格式DMS Kafka作为源表,输出到Kafka sink中(适用于Kafka集群未开启SASL_SSL场景) 将Kafka作为源表,Kafka作为结果表,从Kafka中读取编码格式为json数据类型的数据,输出到日志文件中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云
创建Kafka_SSL类型跨源认证 操作场景 通过在DLI控制台创建的Kafka_SSL类型的跨源认证,将Kafka的认证信息存储到DLI,无需在SQL作业中配置账号密码,安全访问Kafka实例。 MRS Kafka开启Kerberos认证,未开启SSL认证时,创建Kerbero
Kafka源表 功能描述 创建source流从Kafka获取数据,作为作业的输入数据。 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 前提条件 Kafka是
utf8mb4; 步骤4:创建增强型跨源连接 创建DLI连接Kafka的增强型跨源连接 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。 在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取该实例
} } } 步骤4:创建增强型跨源连接 创建DLI连接Kafka的增强型跨源连接 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。 在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该
FLOAT8 ); 步骤4:创建增强型跨源连接 创建DLI连接Kafka的增强型跨源连接 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。 在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该
'SASL_PLAINTEXT' 示例 从Kafka源表获取Kafka source topic数据,通过Upsert Kafka结果表将Kafka source topic数据写入到Kafka sink topic中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增
引入properties中才能生效。 kafka source可以在open里引入。 图1 获取kafka source kafka sink可以在initializeState里引入。 图2 获取kafka sink 操作步骤 从Kafka实例的基本信息页面下载SSL证书,解压后将其中的clinet
connector类型,对于upsert kafka,需配置为'upsert-kafka' connector.version 否 Kafka版本,仅支持:'0.11' format.type 是 数据序列化格式,支持:'csv', 'json'及'avro'等 connector.topic 是 kafka topic名
Bob,330111) 示例2:从Kafka源表获取DMS Kafka source topic数据,通过Upsert Kafka结果表将Kafka source topic数据写入到Kafka sink topic中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应
connector类型,对于upsert kafka,需配置为'upsert-kafka'。 topic 是 无 String Kafka topic名。 properties.bootstrap.servers 是 无 String Kafka brokers地址,以逗号分隔。 key
示例 该示例是从kafka的一个topic中读取数据,并使用kafka sink将数据写入到kafka的另一个topic中。 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组入向规则,使其对当前将要使用的队列放开,并根据kafka的地址测试队列
CSV Format 允许我们基于CSV schema 进行解析和生成CSV 数据。目前的CSV schema 是基于table schema 推导出来的。 支持的Connector Kafka Upsert Kafka 参数说明 表1 参数 是否必选 默认值 类型 说明 format
(avro-confluent) 格式能让您读取被 io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。
(avro-confluent) 格式能让您读取被 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer