数据湖探索 DLI-Kafka源表:示例

时间:2024-11-16 13:21:40

示例

  • 从Kafka中读取编码格式为csv,对象为kafkaSource的表。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    create table kafkaSource(
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_speed INT)
    with (
      'connector.type' = 'kafka',
      'connector.version' = '0.11',
      'connector.topic' = 'test-topic',
      'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
      'connector.properties.group.id' = 'test-group',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'csv'
    );
    
  • 从Kafka中读取编码格式为不含嵌套的json数据,对象为kafkaSource的表。
    例如不含嵌套的json数据格式为:
    {"car_id": 312, "car_owner": "wang", "car_brand": "tang"}
    {"car_id": 313, "car_owner": "li", "car_brand": "lin"}
    {"car_id": 314, "car_owner": "zhao", "car_brand": "han"}
    则创建表语句为:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    create table kafkaSource(
      car_id STRING,
      car_owner STRING,
      car_brand STRING
    )
    with (
      'connector.type' = 'kafka',
      'connector.version' = '0.11',
      'connector.topic' = 'test-topic',
      'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
      'connector.properties.group.id' = 'test-group',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'json'
    );
    
  • 从Kafka中读取编码格式包含嵌套的json数据,对象为kafkaSource的表。

    例如包含嵌套的json数据格式为:

    {
        "id":"1",
        "type":"online",
        "data":{
            "patient_id":1234,
            "name":"bob1234",
            "age":"Bob",
            "gmt_create":"Bob",
            "gmt_modify":"Bob"
        }
    }
    则创建表语句为:
    CREATE table kafkaSource(
      id STRING,
      type STRING,
      data ROW(
        patient_id STRING, 
        name STRING, 
        age STRING, 
        gmt_create STRING, 
        gmt_modify STRING)
    ) 
    with (
      'connector.type' = 'kafka',
      'connector.version' = '0.11',
      'connector.topic' = 'test-topic',
      'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
      'connector.properties.group.id' = 'test-group',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'json'
    );
support.huaweicloud.com/sqlref-flink-dli/dli_08_0301.html