数据湖探索 DLI-Hive源表:示例

时间:2024-12-27 10:34:32

示例

  1. 使用Spark SQL创建Hive语法OBS表,并插入10条数据。模拟数据源。
    CREATE TABLE IF NOT EXISTS demo.student(
      name STRING, 
     score DOUBLE) 
    PARTITIONED BY (classNo INT) 
    STORED AS PARQUET 
    LOCATION 'obs://demo/spark.db/student';
    
    INSERT INTO demo.student PARTITION(classNo=1) VALUES ('Alice', 90.0), ('Bob', 80.0), ('Charlie', 70.0), ('David', 60.0), ('Eve', 50.0), ('Frank', 40.0), ('Grace', 30.0), ('Hank', 20.0), ('Ivy', 10.0), ('Jack', 0.0);
  2. 使用Flink SQL展示使用批的方式,从Hive语法OBS表demo.student中读取数据,并打印。需要开启checkpoint。
    CREATE CATA LOG  myhive WITH (
        'type' = 'hive',
        'default-database' = 'demo',
         'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    create table if not exists print (
        name STRING, 
        score DOUBLE, 
        classNo INT)
    with ('connector' = 'print');
    
    insert into print
    select * from student;
    结果(taskmanager的out日志):
    +I[Alice, 90.0, 1]
    +I[Bob, 80.0, 1]
    +I[Charlie, 70.0, 1]
    +I[David, 60.0, 1]
    +I[Eve, 50.0, 1]
    +I[Frank, 40.0, 1]
    +I[Grace, 30.0, 1]
    +I[Hank, 20.0, 1]
    +I[Ivy, 10.0, 1]
    +I[Jack, 0.0, 1]
  3. 使用Flink SQL展示使用流的方式,从Hive语法OBS表demo.student中读取数据,并打印。
    CREATE CATALOG myhive WITH (
        'type' = 'hive' ,
        'default-database' = 'demo',
         'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    create table if not exists print (
        name STRING, 
        score DOUBLE, 
        classNo INT)
    with ('connector' = 'print');
    
    insert into print
    select * from student /*+ OPTIONS('streaming-source.enable' = 'true', 'streaming-source.monitor-interval' = '3 m') */;
support.huaweicloud.com/sqlref-flink-dli/dli_08_15049.html