云服务器内容精选

  • 功能描述 Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 表1 支持类别 类别 详情 支持表类型 源表、结果表 支持数据格式 CS V JSON Apache Avro Confluent Avro Debezium CDC Canal CDC Maxwell CDC OGG CDC Raw
  • 常见问题 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通 DLI 队列的网段地址。重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。 具体操作请参考增强型跨源连接。 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword. sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。
  • 注意事项 更多具体使用可参考开源社区文档:Apache Kafka SQL 连接器。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。 建表时数据类型的使用请参考Format章节。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' );
  • Topic和Partition的探测 topic 和 topic-pattern 配置项决定了 source 消费的 topic 或 topic 的匹配规则。topic 配置项可接受使用分号间隔的 topic 列表,例如 topic-1;topic-2。 topic-pattern 配置项使用正则表达式来探测匹配的 topic。例如 topic-pattern 设置为 test-topic-[0-9],则在作业启动时,所有匹配该正则表达式的 topic(以 test-topic- 开头,以一位数字结尾)都将被 consumer 订阅。 为允许 consumer 在作业启动之后探测到动态创建的 topic,请将 scan.topic-partition-discovery.interval 配置为一个非负值。这将使 consumer 能够探测匹配名称规则的 topic 中新的 partition。 topic列表和topic匹配规则只适用于 source。对于sink端,Flink目前只支持单一topic。
  • 示例2:将json格式DMS Kafka作为源表,输出到Kafka sink中(适用于Kafka集群未开启SASL_SSL场景) 将Kafka作为源表,Kafka作为结果表,从Kafka中读取编码格式为json数据类型的数据,输出到日志文件中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,并提交运行。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into kafkaSink select * from kafkaSource; 向Kafka的源表的topic中发送如下数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 读取Kafka的结果表的topic,其数据结果参考如下: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  • Connector支持列表 表1 Connector支持列表 Connector 源表 维表 结果表 BlackHole 不支持 不支持 支持 ClickHouse 不支持 不支持 支持 DataGen 支持 不支持 不支持 Doris 支持 支持 支持 DWS 支持 支持 支持 Elasticsearch 不支持 不支持 支持 FileSystem 支持 不支持 支持 Hbase 支持 支持 支持 Hive 支持 支持 支持 JDBC 支持 支持 支持 Kafka 支持 不支持 支持 Print 不支持 不支持 支持 Redis 支持 支持 支持 Upsert Kafka 支持 不支持 支持
  • 表类型 源表:源表是Flink作业的数据输入表,例如Kafka等实时流数据输入。 维表:数据源表的辅助表,用于丰富和扩展源表的数据。在Flink作业中,因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全,而维表就是代表存储数据维度信息的数据源。常见的用户维表有 MySQL,Redis等。 结果表:Flink作业输出的结果数据表,将每条实时处理完的数据写入的目标存储,如 MySQL,HBase 等数据库。
  • 常见问题 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。 具体操作请参考增强型跨源连接。 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword. sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。