云日志服务 LTS-Flink消费:DynamicLtsTableFactory支持SQL作业

时间:2025-01-26 10:50:19

DynamicLtsTableFactory支持SQL作业

LtsDynamicSource和LtsDynamicSink table作业,支持LTS日志直接接入flink,支持SQL语法,示例如下:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);// enable checkpointing       Configuration configuration = tableEnv.getConfig().getConfiguration();       configuration.set(           ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);       configuration.set(           ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));          tableEnv.executeSql("CREATE TABLE source ( " +           "  collectTime varchar, " +           "  lineNum varchar, " +           "  podName varchar, " +           "  pathFile varchar, " +           "  category varchar " +           " )  " +           " with ( " +           // connector 表类型 固定值lts           "'connector' = 'lts',  " +           // LTS 日志服务所属region           "'regionName' = 'cn-north-7',  " +           // LTS 日志租户项目ID           "'projectId' = '**************',  " +           // 注意:认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全          // LTS 租户AK           "'accessKey' = '**************',  " +           // LTS 租户SK           "'accessSecret' = '**************',  " +           // LTS 日志组ID           "'logGroupId' = '**************',  " +           // LTS 日志流ID           "'logStreamId' = '**************',  " +           // LTS 日志流的消费组           "'consumerGroup' = '**************',  " +           // LTS 日志消费起始位置           "'startTime' = '1689836602157000000', " +          // LTS 原始日志是否做JSON解析           "'jsonParse' = 'true' " +           " )");          tableEnv.executeSql("CREATE TABLE print_sink ( " +           "  collectTime varchar, " +           "  lineNum varchar, " +           "  podName varchar, " +           "  pathFile varchar, " +           "  category varchar " +           " )  " +           " with ( " +           // connector 表类型 固定值lts           "'connector' = 'lts', " +           // LTS 日志服务所属region           "'regionName' = 'cn-north-7', " +           // LTS 日志租户项目ID           "'projectId' = '2a473356cca5487f8373be891bffc1cf', " +           // LTS 租户AK           "'accessKey' = 'DADYWPUP8JMUV3UGPEI9', " +           // LTS 租户SK           "'accessSecret' = 'jUtvcc0oIIcGZGoAUvtlSi8Oz6sZdFI2ZqFKBGUZ', " +           // LTS 日志组ID           "'logGroupId' = 'e83e94db-2e29-49c9-ae15-d3a9f4c3ea1b', " +           // LTS 日志流ID           "'logStreamId' = '0a423cfc-dbf8-4cf3-8fb9-f5cf95fa1298' " +           " )");          tableEnv.executeSql("insert into print_sink select * from source ");   }
support.huaweicloud.com/usermanual-lts/lts_07_0050.html