MAPREDUCE服务 MRS-使用旧插件storm-kafka时如何正确设置offset:回答
回答
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; }
其中“hosts”是ZooKeeper的连接串,如:192.168.0.1:2181/kafka,“topic”是待消费的Topic名,“zkRoot”表示在ZooKeeper中的存放数据的根路径,一般为:“/kafka/{topic}”,“id”表示应用的标示,如:app1。读取offset会有以下两种场景:
- 场景1
当拓扑运行后,KafkaSpout会将offset存放在ZooKeeper路径:“/{zkRoot}/{id}/{partitionId}”下,其中“zkRoot”和“id”是用户指定的,“partitionId”是自动获取的。默认情况下,拓扑在启动后会先从ZooKeeper上的offset存放路径读取历史的offset,用作本次的消费起点,因此只需要正确的指定“zkRoot”和“id”,就可以继承历史记录的offset,不用从头开始消费。
- 场景2
没有像场景1中那样设置固定的“zkRoot”或者“id”,导致无法读取历史的offset,如此一来每次提交拓扑都会把历史已经消费过的数据再消费一遍,这时需要通过如下方式手动指定:
SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId); spoutConfig.ignoreZkOffsets = true; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
通过指定SpoutConfig中的“ignoreZkOffsets”和“startOffsetTime”来强制消费最新的数据。
- MapReduce服务_如何使用MapReduce服务_MRS集群客户端安装与使用
- MapReduce服务_什么是Hue_如何使用Hue
- MapReduce服务_什么是HetuEngine_如何使用HetuEngine
- MapReduce服务_什么是Kafka_如何使用Kafka
- MapReduce服务_什么是ZooKeeper_如何使用ZooKeeper
- MapReduce服务_什么是ClickHouse_如何使用ClickHouse
- MapReduce服务_什么是Yarn_如何使用Yarn
- MapReduce服务_什么是Flink_如何使用Flink
- MapReduce服务_什么是Hive_如何使用Hive
- 登录企业邮箱如何开通使用设置