云日志服务 LTS-使用Java SDK管理消费组:按照不同的消费需求,创建不同的LogConsumerConfig

时间:2025-03-10 20:03:28

按照不同的消费需求,创建不同的LogConsumerConfig

  1. 指定消费开始时间,不指定结束时间。
    // 设置batchSize = 500,即每次拉取500条。
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_ LOG _GROUP_ID","TEST_LOG_STREAM_ID","AC CES S_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500);
    
    // 如果需要按照日志组/日志流名称消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500); 
  2. 指定消费开始时间,且指定结束时间。
    // 设置batchSize = 500,即每次拉取500条
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500);
    
    // 如果需要按照日志组/日志流名称消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500); 
  3. 指定ConsumerPosition(BEGIN_CURSOR、END_CURSOR)
    // 指定 BEGIN_CURSOR 开始消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", "ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", ConsumerPosition.BEGIN_CURSOR);
    
    // 指定 END_CURSOR 开始消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", "ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", ConsumerPosition.END_CURSOR);
  4. 支持使用临时AK、SK、STS-Token消费数据(若使用临时STS-Token, 则AK、SK需要填写与STS-Token配套的临时AK、SK)。
    1. 创建STS-Token更新类DemoLogConsumerSTSToken.java (由于临时凭证会存在过期,为了不影响您的消费业务,需要实时更新临时凭证)
      import com.huaweicloud.lts.consumer.interfaces.ILogConsumerSTSToken;
      import com.huaweicloud.lts.producer.STSTokenConfig;
      
      public class DemoLogConsumerSTSToken implements ILogConsumerSTSToken {
      
          @Override
          // SDK会定期从此方法中获取临时AK, 临时SK, 临时securityToken. 如果临时认证信息有变化, 在此方法中实现即可
          public STSTokenConfig getSTSTokenConfig() {
              String accessKeyId = "";
              String accessKeySecret = "";
              String securityToken = "";
              return new STSTokenConfig(accessKeyId, accessKeySecret, securityToken);
          }
      }
    2. 创建LogConsumerConfig
      // 指定消费开始时间,不指定结束时间
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", 1685444710000000000L);
      
      // 指定消费开始时间,且指定结束时间
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", 1685444710000000000L, 1685445470192043318L);
      
      // 指定 BEGIN_CURSOR
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", ConsumerPosition.BEGIN_CURSOR);
      
      // 指定 END_CURSOR
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", ConsumerPosition.END_CURSOR);
  5. 开启跨云消费日志。此功能仅适用于开发调测,对于消费性能不做保障
    LogConsumerConfig config = new LogConsumerConfig(....);
    config.setEnableLocalTest(true);
  6. 开启消费历史数据。
    LogConsumerConfig config = new LogConsumerConfig(....);
    config.setHistoryQuery(true);
  7. 使用自定义IP消费日志

    LogConsumerConfig config = new LogConsumerConfig(....);
    config.setProxyIp("127.0.0.1:8102");

support.huaweicloud.com/usermanual-lts/lts_07_0046.html