[技術(shù)分享] Kafka Pause采坑記錄
Kafka在使用中,我們可能會(huì)需要有主動(dòng)暫停消費(fèi)的業(yè)務(wù)需求,等待一個(gè)信號(hào)再主動(dòng)恢復(fù),但實(shí)際使用中,客戶端提供的
暫停功能可能和你想象的不一樣,當(dāng)使用不當(dāng)時(shí),可能會(huì)引起pause機(jī)制失效,自動(dòng)恢復(fù)消費(fèi),一定要特別注意!
[采坑條件]
- 使用kafka 客戶端提供的pause功能
- kafka消費(fèi)者出現(xiàn)rebalance情況
[采坑復(fù)現(xiàn)]
1.創(chuàng)建一個(gè)test的topic,分區(qū)數(shù)為1
2.啟動(dòng)進(jìn)程A,訂閱test,顯式調(diào)用KafkaConsumer pause
3.啟動(dòng)進(jìn)程B,訂閱test,觸發(fā)kafka 消費(fèi)者rebalance情況
4.觀察進(jìn)程A,繼續(xù)消費(fèi)test topic的消息,pause機(jī)制失效
[Kafka 客戶端實(shí)現(xiàn)pause的機(jī)制原理]
1.puase功能是在kafka 客戶端實(shí)現(xiàn)的,服務(wù)端并沒有維護(hù)分區(qū)是否暫停消費(fèi)的狀態(tài)
2.kafka 客戶端本地維護(hù)一個(gè)消費(fèi)者訂閱分區(qū)的狀態(tài)列表,key為分區(qū),value為分區(qū)消費(fèi)者狀態(tài),屬性pause為true或fause
3.當(dāng)新的kafka 消費(fèi)者加入消費(fèi)組或者退出消費(fèi)組,都會(huì)引起kafka 消費(fèi)組 rebalance
4.當(dāng)rebalance之后,kafka客戶端會(huì)重新分配消費(fèi)的分區(qū),原先的狀態(tài)位丟失,導(dǎo)致pause失效
[源碼分析]
客戶端通過TopicPartitionState維護(hù)每個(gè)分區(qū)狀態(tài),包含pause屬性,顯式調(diào)用后為true
private static class TopicPartitionState {
private Long position; // last consumed position
private Long highWatermark; // the high watermark from last fetch
private Long lastStableOffset;
private OffsetAndMetadata committed; // last committed position
private boolean paused; // whether this partition has been paused by the user
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
kafka 消費(fèi)者通過AbstractCoordinator類,作用是維護(hù)心跳和分區(qū)等相關(guān)信息
這里主要分析onJoinComplete方法,此方法會(huì)在消費(fèi)者加入group成功后調(diào)用
/**
* Invoked when a group member has successfully joined a group.
* @param generation The generation that was joined
* @param memberId The identifier for the local member in the group
* @param protocol The protocol selected by the coordinator
* @param memberAssignment The assignment propagated from the group leader
*/
protected abstract void onJoinComplete(int generation,
String memberId,
String protocol,
ByteBuffer memberAssignment);
具體的實(shí)現(xiàn)類為ConsumerCoordinator,實(shí)現(xiàn)細(xì)節(jié)中,主要關(guān)注方法subscriptions.assignFromSubscribed(assignment.partitions());
@Override
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
// set the flag to refresh last committed offsets
subscriptions.needRefreshCommits();
// update partition assignment 此處是重點(diǎn)
subscriptions.assignFromSubscribed(assignment.partitions());
.......
assignFromSubscribed是最終維護(hù)分區(qū)的方法
public void assignFromSubscribed(Collection<TopicPartition> assignments) {
if (!this.partitionsAutoAssigned())
throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");
Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments);
fireOnAssignment(assignedPartitionStates.keySet());
......
可以看到最終的TopicPartitionState在這重新生成了一次,導(dǎo)致原先保存的各個(gè)分區(qū)是否暫停消費(fèi)的狀態(tài)丟失
private static Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) {
Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());
for (TopicPartition tp : assignments)
map.put(tp, new TopicPartitionState());
return map;
}