kafka 重放 重播 從某個時間點或者offset開始消費

轉自: https://www.jianshu.com/p/932663e9a226

 

consumer.subscribe(topicA);
consumer.poll(100);//正常訂閱topic和poll消息

Set<TopicPartition> assignments = consumer.assignment();//獲取consumer所分配的分區信息
Map<TopicPartition, Long> query = new HashMap<>();//構造offsetsForTimes參數,通過時間戳找到offset
for (TopicPartition topicPartition : assignments) {
    System.out.println(topicPartition);
    query.put(topicPartition, 1550804131000L);
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : result.entrySet()) {
    System.out.println(entry);
    consumer.seek(entry.getKey(), entry.getValue().offset());//每個topic的partition都seek到執行的offset
}
  

 

posted @ 2019-10-12 10:04  leon0  閱讀(683)  評論(0編輯  收藏
七乐彩2011年走势图南方双彩