MAPREDUCE服务 MRS-使用旧插件storm-kafka时如何正确设置offset:回答

时间:2024-06-29 14:11:13

回答

旧插件storm-kafka中的KafkaSpout使用的是Kafka的“SimpleConsumer”接口,需要自主管理offset,KafkaSpout中根据用户定义的字段将Topic中每个Patition的offset记录在ZooKeeper中,定义如下:
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
    super(hosts, topic);
    this.zkRoot = zkRoot;
    this.id = id;
}

其中“hosts”是ZooKeeper的连接串,如:192.168.0.1:2181/kafka,“topic”是待消费的Topic名,“zkRoot”表示在ZooKeeper中的存放数据的根路径,一般为:“/kafka/{topic}”“id”表示应用的标示,如:app1。读取offset会有以下两种场景:

  • 场景1

    当拓扑运行后,KafkaSpout会将offset存放在ZooKeeper路径:“/{zkRoot}/{id}/{partitionId}”下,其中“zkRoot”“id”是用户指定的,“partitionId”是自动获取的。默认情况下,拓扑在启动后会先从ZooKeeper上的offset存放路径读取历史的offset,用作本次的消费起点,因此只需要正确的指定“zkRoot”“id”,就可以继承历史记录的offset,不用从头开始消费。

  • 场景2

    没有像场景1中那样设置固定的“zkRoot”或者“id”,导致无法读取历史的offset,如此一来每次提交拓扑都会把历史已经消费过的数据再消费一遍,这时需要通过如下方式手动指定:

    SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId);
    spoutConfig.ignoreZkOffsets = true;
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

    通过指定SpoutConfig中的“ignoreZkOffsets”“startOffsetTime”来强制消费最新的数据。

support.huaweicloud.com/devg3-mrs/mrs_07_420038.html