* A client that consumes records from a Kafka cluster.
* kafka客戶端從kafka集群消費消息(獲取消息)
*? This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate ????within the cluster. This client also interacts with the broker to allow groups of consumers to load balance consumption ????using?consumer groups.????
* 該客戶端透明地處理Kafka broker的故障,并透明地適應(yīng)它在集群內(nèi)獲取遠處的主題分區(qū)。該客戶端還與Kafka broker交互為了使用許消費者組使用?消費者組?來負載均衡消費。
* The consumer maintains TCP connections to the necessary brokers to fetch data Failure to close the consumer after use will leak these connections.The consumer is not thread-safe. See?Multi-threaded Processing?for more details.
* Cross-Version Compatibility
* This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support? certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added in version 0.10.1. You will receive an {@link org.apache.kafka.common.errors.UnsupportedVersionException} when invoking an API that is not available on the running broker version.
*消費者維護TCP連接到brokers去獲取數(shù)據(jù),在使用消費者后,關(guān)閉消費者,如果消費者關(guān)閉失敗,那么這些鏈接將泄漏。消費者不是線程安全的。 有關(guān)更多詳細信息,請參閱?多線程處理。
* 跨版本兼容性
*此客戶端可以與0.10.0或更新版本的代理進行通信。 較早或較新的brokers可能不支持某些功能。 例如,0.10.0代理不支持? ? ?offsetsForTimes,因為此功能是在版本0.10.1中添加的。 調(diào)用正在運行的 broker 版本上不可用的API時,您將收到{@link org.apache.kafka.common.errors.UnsupportedVersionException}。
*and Consumer Position
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer:
*偏移和消費者的位置
* Kafka為分區(qū)中的每條記錄保留一個數(shù)字偏移量。 該偏移量用作該分區(qū)內(nèi)記錄的唯一標識符,也表示消費者在分區(qū)中的位置。 例如,處于位置5的消費者已經(jīng)消耗了具有偏移量0到4的記錄,并且將接下來以偏移量5接收該記錄。實際上有兩個與消費者有關(guān)的位置的概念:一個是消費者所處的位置一個是消費者已經(jīng)消費了的位置
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to {@link #poll(long)}.
*消費者的{@link #position(TopicPartition)position}給出下一個記錄的偏移量。 這比消費者在該分區(qū)中看到的最高偏移大一點。 每次消費者在調(diào)用{@link #poll(long)}接收到消息時,它都會自動前進。
* The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. {@link #commitSync() commitSync} and {@link #commitAsync(OffsetCommitCallback) commitAsync}).
* {@link #commitSync()提交位置}是安全存儲的最后一個偏移量。 如果進程失敗并重新啟動,這是個位置是消費者將要繼續(xù)使用的繼續(xù)提交位置。 用戶也可以選擇通過調(diào)用其中一個提交API(例如{@link #commitSync()commitSync}和{@link #commitAsync(OffsetCommitCallback)commitAsync})來手動控制此提交位置。
* This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.?Consumer Groups and Topic Subscriptions?
*Kafka uses the concept of?consumer groups?to allow a pool of processes to divide the work of consuming and processing records. These processes can either be running on the same machine or they can be distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances? sharing the same group.id will be part of the same consumer group.
*這種由消費者控制消費記錄的機制,將在下面進一步詳細討論。
*卡夫卡使用消費者組和來劃分消費和處理記錄的工作。 這些進程既可以在同一臺機器上運行,也可以為了更好的處理可擴展性和容錯性,將其分布在多臺機器上。 所有共享同一個group.id 的消費者實例屬于同一個消費者組。
* Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. This is achieved by balancing the partitions between all members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions.
*消費者組中的每個消費者可以通過{@link #subscribe(Collection,ConsumerRebalanceListener)subscribe} API之一,動態(tài)設(shè)置想要訂閱的主題列表。 Kafka將把消費者組訂閱的主題,的每條消息傳遞給每個消費者組中的一個進程。 這是通過平衡消費者組中的所有成員之間的分區(qū)來實現(xiàn)的,以便每個分區(qū)恰好分配給組中的一個消費者。 因此,如果有一個包含四個分區(qū)的主題和一個包含兩個進程的消費者組,則每個進程將從兩個分區(qū)中獲取消息(如:一個主題有4個分區(qū),同時訂閱這個主題的消費者組有兩個消費進程。那么每個消費進程從連個分區(qū)獲取記錄)。
* Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved from existing consumers to the new one. This is known as?rebalancing?the group and is discussed in more detail?below. Group rebalancing is also used when new partitions are added to one of the subscribed topics or when a new topic matching a {@link #subscribe(Pattern, ConsumerRebalanceListener) subscribed regex} is created. The group will automatically detect the new partitions through periodic metadata refreshes and assign them to members of the group.
*消費者組動態(tài)維護其成員:如果這個消費者組的消費者進程失敗,分配給它的分區(qū)將被重新分配給同一組中的其他使用者(消費者進程)。同樣,如果新的消費者加入該組,分區(qū)將從現(xiàn)有的消費者移動到新的消費者(添加新的消費者到消費者組時,分區(qū)將重新分配)。這被稱為“重新平衡”組,并在下面進行更詳細的討論。當生成一個新的分區(qū)或者一個新的主題時,也會使用“重新平衡”組,,該消費者組定期刷新來自動檢測新分區(qū),并將其分配給組成員。
* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a given topic without duplicating data (additional consumers are actually quite cheap).
從概念上講,你可以把一個消費者組看作是一個由多個進程組成的單個邏輯用戶。 作為一個多用戶系統(tǒng),Kafka支持為給定的主題提供任意數(shù)量的用戶組,而不需要復(fù)制數(shù)據(jù)(添加新的用戶實際上相當簡單)。
* This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to a queue in a traditional messaging system all processes would be part of a single consumer group and hence record delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would have its own consumer group, so each process would subscribe to all the records published to the topic.
* In addition, when group reassignment happens automatically, consumers can be notified through a {@link ConsumerRebalanceListener}, which allows them to finish necessary application-level logic such as state cleanup, manual offset commits, etc. See?Storing Offsets Outside Kafka?for more details.
*此外,當分組重新分配自動發(fā)生時,消費者可以通過{@Link ConsumerRebalanceListener}來獲取通知,這允許消費者完成必要的應(yīng)用程序級邏輯,例如狀態(tài)清除,手動偏移提交等。參見在卡夫卡外存儲偏移量以獲取更多詳細信息。
* It is also possible for the consumer to?manually assign?specific partitions (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition assignment and consumer group coordination will be disabled.Detecting Consumer Failures?After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.
*我們也可以手動為消費者分配分區(qū)。 當使用手動分區(qū)時,動態(tài)分區(qū)分配和消費者組協(xié)調(diào)將被禁用。消費者訂閱了一組主題,當{@link #poll(long)}被調(diào)用時,消費者將自動加入該消費者組。poll (long ) API旨在確保消費者存活。 只要您繼續(xù)調(diào)用poll (long ),消費者就會留在消費者組中,并繼續(xù)從分配的分區(qū)接收消息。 在這個方法的底層,消費者定期發(fā)送心跳到中心服務(wù)器。 如果消費者在session.timeout.ms 期間內(nèi)崩潰或無法發(fā)送心跳,那么消費者將被視為死亡,其分區(qū)將被重新分配。
* It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don't call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}).This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.
*消費者也有可能遇到“活鎖”的情況,即持續(xù)發(fā)送心跳,但沒有消費任何數(shù)據(jù)。 為了防止消費者在這種情況下無限期地保持其分區(qū),我們使用max.poll.interval.ms 設(shè)置提供活躍檢測機制。 基本上,如果最大時間間隔max.poll.interval.ms沒有調(diào)用poll函數(shù)來消費消息,則客戶端將主動離開組,以便另一個消費者可以接管其分區(qū)。 發(fā)生這種情況時,您可能會看到一個偏移提交失?。ㄈ缤ㄟ^調(diào)用{@link #commitSync()})拋出的{@link CommitFailedException}所示。這是一個安全機制,它保證只有組中存活的成員可以提交偏移。 所以為了要留在消費者組中,你必須持續(xù)調(diào)用poll。
* The consumer provides two configuration settings to control the behavior of the poll loop: max.poll.interval.ms: By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call {@link #poll(long) poll} often enough. max.poll.records: Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.
*消費者通過兩個配置設(shè)置來控制循環(huán)調(diào)用POLL的行為:max.poll.interval.ms :通過增加預(yù)期輪詢之間的時間間隔,您可以給消費者 有更多時間來處理{@link #poll(long)}返回的一批記錄。 缺點是增加此值可能會延遲群組重新平衡,因為消費者只會在調(diào)用內(nèi)部加入重新平衡。 您可以使用此設(shè)置來限定完成重新平衡所需的時間,但如果消費者不能實際經(jīng)常調(diào)用{@link #poll(long)poll},則風險會進一步降低。
max。 poll.records :使用此設(shè)置可將單個調(diào)用返回的記錄總數(shù)限制為輪詢。 這可以更容易地預(yù)測每個輪詢間隔內(nèi)必須處理的最大值。 通過調(diào)整此值,您可能會減少輪詢間隔,這將減少組重新平衡的影響。