云服务器内容精选

  • 示例 从Kafka中读取编码格式为csv,对象为kafkaSource的表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'csv' ); 从Kafka中读取编码格式为不含嵌套的json数据,对象为kafkaSource的表。 例如不含嵌套的json数据格式为: {"car_id": 312, "car_owner": "wang", "car_brand": "tang"} {"car_id": 313, "car_owner": "li", "car_brand": "lin"} {"car_id": 314, "car_owner": "zhao", "car_brand": "han"} 则创建表语句为: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); 从Kafka中读取编码格式包含嵌套的json数据,对象为kafkaSource的表。 例如包含嵌套的json数据格式为: { "id":"1", "type":"online", "data":{ "patient_id":1234, "name":"bob1234", "age":"Bob", "gmt_create":"Bob", "gmt_modify":"Bob" } } 则创建表语句为: CREATE table kafkaSource( id STRING, type STRING, data ROW( patient_id STRING, name STRING, age STRING, gmt_create STRING, gmt_modify STRING) ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于kafka,需配置为'kafka'。 connector.version 是 Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 format.type 是 数据反序列化格式,支持:'csv', 'json'及'avro'等。 format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.topic 是 kafka topic名。该参数和“connector.topic-pattern”两个参数只能使用其中一个。 connector.topic-pattern 否 匹配读取kafka topic名称的正则表达式。该参数和“connector.topic”两个参数只能使用其中一个。 例如: 'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔。 connector.properties.group.id 否 消费组名称 connector.startup-mode 否 consumer启动模式,支持:'earliest-offset', 'latest-offset', 'group-offsets', 'specific-offsets'及'timestamp'。默认值为'group-offsets'。 connector.specific-offsets 否 指定消费offset,'startup-mode'为'specific-offsets'时需配置,格式为: 'partition:0,offset:42;partition:1,offset:300'。 connector.startup-timestamp-millis 否 指定起始消费时间戳,'startup-mode'为'timestamp'时需配置。 connector.properties.* 否 配置kafka任意原生属性。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'connector.properties.group.id' = '', 'connector.startup-mode' = '', 'format.type' = '' );