云服务器内容精选

  • DynamicLtsTableFactory支持SQL作业 LtsDynamicSource和LtsDynamicSink table作业,支持LTS日志直接接入flink,支持SQL语法,示例如下: public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);// enable checkpointing Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.set( ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); configuration.set( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10)); tableEnv.executeSql("CREATE TABLE source ( " + " collectTime varchar, " + " lineNum varchar, " + " podName varchar, " + " pathFile varchar, " + " category varchar " + " ) " + " with ( " + // connector 表类型 固定值lts "'connector' = 'lts', " + // LTS 日志服务所属region "'regionName' = 'cn-north-7', " + // LTS 日志租户项目ID "'projectId' = '**************', " + // 注意:认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全 // LTS 租户AK "'accessKey' = '**************', " + // LTS 租户SK "'accessSecret' = '**************', " + // LTS 日志组ID "'logGroupId' = '**************', " + // LTS 日志流ID "'logStreamId' = '**************', " + // LTS 日志流的消费组 "'consumerGroup' = '**************', " + // LTS 日志消费起始位置 "'startTime' = '1689836602157000000', " + // LTS 原始日志是否做JSON解析 "'jsonParse' = 'true' " + " )"); tableEnv.executeSql("CREATE TABLE print_sink ( " + " collectTime varchar, " + " lineNum varchar, " + " podName varchar, " + " pathFile varchar, " + " category varchar " + " ) " + " with ( " + // connector 表类型 固定值lts "'connector' = 'lts', " + // LTS 日志服务所属region "'regionName' = 'cn-north-7', " + // LTS 日志租户项目ID "'projectId' = '2a473356cca5487f8373be891bffc1cf', " + // LTS 租户AK "'accessKey' = 'DADYWPUP8JMUV3UGPEI9', " + // LTS 租户SK "'accessSecret' = 'jUtvcc0oIIcGZGoAUvtlSi8Oz6sZdFI2ZqFKBGUZ', " + // LTS 日志组ID "'logGroupId' = 'e83e94db-2e29-49c9-ae15-d3a9f4c3ea1b', " + // LTS 日志流ID "'logStreamId' = '0a423cfc-dbf8-4cf3-8fb9-f5cf95fa1298' " + " )"); tableEnv.executeSql("insert into print_sink select * from source "); }