1. 概述
進(jìn)入 K8s 的世界,會(huì)發(fā)現(xiàn)有很多的 Controller,它們都是為了完成某類資源(如 pod 是通過(guò) DeploymentController, ReplicaSetController 進(jìn)行管理)的調(diào)諧,目標(biāo)是保持用戶期望的狀態(tài)。
K8s 中有幾十種類型的資源,如何能讓 K8s 內(nèi)部以及外部用戶方便、高效的獲取某類資源的變化,就是本文 Informer 要實(shí)現(xiàn)的。本文將從 Reflector(反射器)、DeletaFIFO(增量隊(duì)列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享資源通知器)、processorListener(事件監(jiān)聽處理器)、workqueue(事件處理工作隊(duì)列) 等方面進(jìn)行解析。
本文及后續(xù)相關(guān)文章都基于 K8s v1.22

2. 從 Reflector 說(shuō)起
Reflector 的主要職責(zé)是從 apiserver 拉取并持續(xù)監(jiān)聽(ListAndWatch) 相關(guān)資源類型的增刪改(Add/Update/Delete)事件,存儲(chǔ)在由 DeltaFIFO 實(shí)現(xiàn)的本地緩存(local Store) 中。
首先看一下 Reflector 結(jié)構(gòu)體定義:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// 通過(guò) file:line 唯一標(biāo)識(shí)的 name
name string
// 下面三個(gè)為了確認(rèn)類型
expectedTypeName string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
// 存儲(chǔ) interface: 具體由 DeltaFIFO 實(shí)現(xiàn)存儲(chǔ)
store Store
// 用來(lái)從 apiserver 拉取全量和增量資源
listerWatcher ListerWatcher
// 下面兩個(gè)用來(lái)做失敗重試
backoffManager wait.BackoffManager
initConnBackoffManager wait.BackoffManager
// informer 使用者重新同步的周期
resyncPeriod time.Duration
// 判斷是否滿足可以重新同步的條件
ShouldResync func() bool
clock clock.Clock
// 是否要進(jìn)行分頁(yè) List
paginatedResult bool
// 最后同步的資源版本號(hào),以此為依據(jù),watch 只會(huì)監(jiān)聽大于此值的資源
lastSyncResourceVersion string
// 最后同步的資源版本號(hào)是否可用
isLastSyncResourceVersionUnavailable bool
// 加把鎖控制版本號(hào)
lastSyncResourceVersionMutex sync.RWMutex
// 每頁(yè)大小
WatchListPageSize int64
// watch 失敗回調(diào) handler
watchErrorHandler WatchErrorHandler
}
從結(jié)構(gòu)體定義可以看到,通過(guò)指定目標(biāo)資源類型進(jìn)行 ListAndWatch,并可進(jìn)行分頁(yè)相關(guān)設(shè)置。
第一次拉取全量資源(目標(biāo)資源類型) 后通過(guò) syncWith 函數(shù)全量替換(Replace) 到 DeltaFIFO queue/items 中,之后通過(guò)持續(xù)監(jiān)聽 Watch(目標(biāo)資源類型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消費(fèi)。
watch 目標(biāo)類型通過(guò) Go reflect 反射實(shí)現(xiàn)如下:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
...
}
- 通過(guò)反射確認(rèn)目標(biāo)資源類型,所以命名為 Reflector 還是比較貼切的;
- List/Watch 的目標(biāo)資源類型在 NewSharedIndexInformer.ListerWatcher 進(jìn)行了確定,但 Watch 還會(huì)在 watchHandler 中再次比較一下目標(biāo)類型;
3. 認(rèn)識(shí) DeltaFIFO
還是先看下 DeltaFIFO 結(jié)構(gòu)體定義:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// 讀寫鎖、條件變量
lock sync.RWMutex
cond sync.Cond
// kv 存儲(chǔ):objKey1->Deltas[obj1-Added, obj1-Updated...]
items map[string]Deltas
// 只存儲(chǔ)所有 objKeys
queue []string
// 是否已經(jīng)填充:通過(guò) Replace() 接口將第一批對(duì)象放入隊(duì)列,或者第一次調(diào)用增、刪、改接口時(shí)標(biāo)記為true
populated bool
// 通過(guò) Replace() 接口將第一批對(duì)象放入隊(duì)列的數(shù)量
initialPopulationCount int
// keyFunc 用來(lái)從某個(gè) obj 中獲取其對(duì)應(yīng)的 objKey
keyFunc KeyFunc
// 已知對(duì)象,其實(shí)就是 Indexer
knownObjects KeyListerGetter
// 隊(duì)列是否已經(jīng)關(guān)閉
closed bool
// 以 Replaced 類型發(fā)送(為了兼容老版本的 Sync)
emitDeltaTypeReplaced bool
}
DeltaType 可分為以下類型:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced" // 第一次或重新同步
Sync DeltaType = "Sync" // 老版本重新同步叫 Sync
)
通過(guò)上面的 Reflector 分析可以知道,DeltaFIFO 的職責(zé)是通過(guò)隊(duì)列加鎖處理(queueActionLocked)、去重(dedupDeltas)、存儲(chǔ)在由 DeltaFIFO 實(shí)現(xiàn)的本地緩存(local Store) 中,包括 queue(僅存 objKeys) 和 items(存 objKeys 和對(duì)應(yīng)的 Deltas 增量變化),并通過(guò) Pop 不斷消費(fèi),通過(guò) Process(item) 處理相關(guān)邏輯。

4. 索引 Indexer
上一步 ListAndWatch 到的資源已經(jīng)存儲(chǔ)到 DeltaFIFO 中,接著調(diào)用 Pop 從隊(duì)列進(jìn)行消費(fèi)。實(shí)際使用中,Process 處理函數(shù)由 sharedIndexInformer.HandleDeltas 進(jìn)行實(shí)現(xiàn)。HandleDeltas 函數(shù)根據(jù)上面不同的 DeltaType 分別進(jìn)行 Add/Update/Delete,并同時(shí)創(chuàng)建、更新、刪除對(duì)應(yīng)的索引。
具體索引實(shí)現(xiàn)如下:
// staging/src/k8s.io/client-go/tools/cache/index.go
// map 索引類型 => 索引函數(shù)
type Indexers map[string]IndexFunc
// map 索引類型 => 索引值 map
type Indices map[string]Index
// 索引值 map: 由索引函數(shù)計(jì)算所得索引值(indexedValue) => [objKey1, objKey2...]
type Index map[string]sets.String
索引函數(shù)(IndexFunc):就是計(jì)算索引的函數(shù),這樣允許擴(kuò)展多種不同的索引計(jì)算函數(shù)。默認(rèn)也是最常用的索引函數(shù)是:MetaNamespaceIndexFunc。
索引值(indexedValue):有些地方叫 indexKey,表示由索引函數(shù)(IndexFunc) 計(jì)算出來(lái)的索引值(如 ns1)。
對(duì)象鍵(objKey):對(duì)象 obj 的 唯一 key(如 ns1/pod1),與某個(gè)資源對(duì)象一一對(duì)應(yīng)。

可以看到,Indexer 由 ThreadSafeStore 接口集成,最終由 threadSafeMap 實(shí)現(xiàn)。
- 索引函數(shù) IndexFunc(如 MetaNamespaceIndexFunc)、KeyFunc(如 MetaNamespaceKeyFunc) 區(qū)別:前者表示如何計(jì)算索引,后者表示如何獲取對(duì)象鍵(objKey);
- 索引鍵(indexKey,有些地方是 indexedValue)、對(duì)象鍵(objKey) 區(qū)別:前者表示由索引函數(shù)(IndexFunc) 計(jì)算出來(lái)的索引鍵(如 ns1),后者則是 obj 的 唯一 key(如 ns1/pod1);
5. 總管家 Controller
Controller 作為核心中樞,集成了上面的組件 Reflector、DeltaFIFO、Indexer、Store,成為連接下游消費(fèi)者的橋梁。
Controller 由 controller 結(jié)構(gòu)體進(jìn)行具體實(shí)現(xiàn):
在 K8s 中約定俗成:大寫定義的 interface 接口,由對(duì)應(yīng)小寫定義的結(jié)構(gòu)體進(jìn)行實(shí)現(xiàn)。
// staging/src/k8s.io/client-go/tools/cache/controller.go
type controller struct {
config Config
reflector *Reflector // 上面已分析的組件
reflectorMutex sync.RWMutex
clock clock.Clock
}
type Config struct {
// 實(shí)際由 DeltaFIFO 實(shí)現(xiàn)
Queue
// 構(gòu)造 Reflector 需要
ListerWatcher
// Pop 出來(lái)的 obj 處理函數(shù)
Process ProcessFunc
// 目標(biāo)對(duì)象類型
ObjectType runtime.Object
// 全量重新同步周期
FullResyncPeriod time.Duration
// 是否進(jìn)行重新同步的判斷函數(shù)
ShouldResync ShouldResyncFunc
// 如果為 true,Process() 函數(shù)返回 err,則再次入隊(duì) re-queue
RetryOnError bool
// Watch 返回 err 的回調(diào)函數(shù)
WatchErrorHandler WatchErrorHandler
// Watch 分頁(yè)大小
WatchListPageSize int64
}
Controller 中以 goroutine 協(xié)程方式啟動(dòng) Run 方法,會(huì)啟動(dòng) Reflector 的 ListAndWatch(),用于從 apiserver 拉取全量和監(jiān)聽增量資源,存儲(chǔ)到 DeltaFIFO。接著,啟動(dòng) processLoop 不斷從 DeltaFIFO Pop 進(jìn)行消費(fèi)。在 sharedIndexInformer 中 Pop 出來(lái)進(jìn)行處理的函數(shù)是 HandleDeltas,一方面維護(hù) Indexer 的 Add/Update/Delete,另一方面調(diào)用下游 sharedProcessor 進(jìn)行 handler 處理。
6. 啟動(dòng) SharedInformer
SharedInformer 接口由 SharedIndexInformer 進(jìn)行集成,由 sharedIndexInformer(這里看到了吧,又是大寫定義的 interface 接口,由對(duì)應(yīng)小寫定義的結(jié)構(gòu)體進(jìn)行實(shí)現(xiàn)) 進(jìn)行實(shí)現(xiàn)。
看一下結(jié)構(gòu)體定義:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
type sharedIndexInformer struct {
indexer Indexer
controller Controller
// 處理函數(shù),將是重點(diǎn)
processor *sharedProcessor
// 檢測(cè) cache 是否有變化,一把用作調(diào)試,默認(rèn)是關(guān)閉的
cacheMutationDetector MutationDetector
// 構(gòu)造 Reflector 需要
listerWatcher ListerWatcher
// 目標(biāo)類型,給 Reflector 判斷資源類型
objectType runtime.Object
// Reflector 進(jìn)行重新同步周期
resyncCheckPeriod time.Duration
// 如果使用者沒有添加 Resync 時(shí)間,則使用這個(gè)默認(rèn)的重新同步周期
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// 兩個(gè) bool 表達(dá)了三個(gè)狀態(tài):controller 啟動(dòng)前、已啟動(dòng)、已停止
started, stopped bool
startedLock sync.Mutex
// 當(dāng) Pop 正在消費(fèi)隊(duì)列,此時(shí)新增的 listener 需要加鎖,防止消費(fèi)混亂
blockDeltas sync.Mutex
// Watch 返回 err 的回調(diào)函數(shù)
watchErrorHandler WatchErrorHandler
}
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener // 需要 sync 的 listeners
clock clock.Clock
wg wait.Group
}
從結(jié)構(gòu)體定義可以看到,通過(guò)集成的 controller(上面已分析) 進(jìn)行 Reflector ListAndWatch,并存儲(chǔ)到 DeltaFIFO,并啟動(dòng) Pop 消費(fèi)隊(duì)列,在 sharedIndexInformer 中 Pop 出來(lái)進(jìn)行處理的函數(shù)是 HandleDeltas。
所有的 listeners 通過(guò) sharedIndexInformer.AddEventHandler 加入到 processorListener 數(shù)組切片中,并通過(guò)判斷當(dāng)前 controller 是否已啟動(dòng)做不同處理如下:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
...
// 如果還沒有啟動(dòng),則直接 addListener 加入即可返回
if !s.started {
s.processor.addListener(listener)
return
}
// 加鎖控制
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
// 遍歷所有對(duì)象,發(fā)送到剛剛新加入的 listener
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
接著,在 HandleDeltas 中,根據(jù) obj 的 Delta 類型(Added/Updated/Deleted/Replaced/Sync) 調(diào)用 sharedProcessor.distribute 給所有監(jiān)聽 listeners 處理。
7. 注冊(cè) SharedInformerFactory
SharedInformerFactory 作為使用 SharedInformer 的工廠類,提供了高內(nèi)聚低耦合的工廠類設(shè)計(jì)模式,其結(jié)構(gòu)體定義如下:
// staging/src/k8s.io/client-go/informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory // 重點(diǎn)內(nèi)部接口
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface
Internal() apiserverinternal.Interface
Apps() apps.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Storage() storage.Interface
}
// staging/src/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
Start(stopCh <-chan struct{}) // 啟動(dòng) SharedIndexInformer.Run
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer // 目標(biāo)類型初始化
}
以 PodInformer 為例,說(shuō)明使用者如何構(gòu)建自己的 Informer,PodInformer 定義如下:
// staging/src/k8s.io/client-go/informers/core/v1/pod.go
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
由小寫的 podInformer 實(shí)現(xiàn)(又看到了吧,大寫接口小寫實(shí)現(xiàn)的 K8s 風(fēng)格):
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
由使用者傳入目標(biāo)類型(&corev1.Pod{})、構(gòu)造函數(shù)(defaultInformer),調(diào)用 SharedInformerFactory.InformerFor 實(shí)現(xiàn)目標(biāo) Informer 的注冊(cè),然后調(diào)用 SharedInformerFactory.Start 進(jìn)行 Run,就啟動(dòng)了上面分析的 SharedIndexedInformer -> Controller -> Reflector -> DeltaFIFO 流程。
通過(guò)使用者自己傳入目標(biāo)類型、構(gòu)造函數(shù)進(jìn)行 Informer 注冊(cè),實(shí)現(xiàn)了 SharedInformerFactory 高內(nèi)聚低耦合的設(shè)計(jì)模式。
8. 回調(diào) processorListener
所有的 listerners 由 processorListener 實(shí)現(xiàn),分為兩組:listeners, syncingListeners,分別遍歷所屬組全部 listeners,將數(shù)據(jù)投遞到 processorListener 進(jìn)行處理。
- 因?yàn)楦?listeners 設(shè)置的 resyncPeriod 可能不一致,所以將沒有設(shè)置(resyncPeriod = 0) 的歸為 listeners 組,將設(shè)置了 resyncPeriod 的歸到 syncingListeners 組;
- 如果某個(gè) listener 在多個(gè)地方(sharedIndexInformer.resyncCheckPeriod, sharedIndexInformer.AddEventHandlerWithResyncPeriod)都設(shè)置了 resyncPeriod,則取最小值 minimumResyncPeriod;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
從代碼可以看到 processorListener 巧妙地使用了兩個(gè) channel(addCh, nextCh) 和一個(gè) pendingNotifications(由 slice 實(shí)現(xiàn)的滾動(dòng) Ring) 進(jìn)行 buffer 緩沖,默認(rèn)的 initialBufferSize = 1024。既做到了高效傳遞數(shù)據(jù),又不阻塞上下游處理,值得學(xué)習(xí)。

9. workqueue 忙起來(lái)
通過(guò)上一步 processorListener 回調(diào)函數(shù),交給內(nèi)部 ResourceEventHandler 進(jìn)行真正的增刪改(CUD) 處理,分別調(diào)用 OnAdd/OnUpdate/OnDelete 注冊(cè)函數(shù)進(jìn)行處理。
為了快速處理而不阻塞 processorListener 回調(diào)函數(shù),一般使用 workqueue 進(jìn)行異步化解耦合處理,其實(shí)現(xiàn)如下:

從圖中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最終由 rateLimitingType 進(jìn)行實(shí)現(xiàn),提供了 rateLimit 限速、delay 延時(shí)入隊(duì)(由優(yōu)先級(jí)隊(duì)列通過(guò)小頂堆實(shí)現(xiàn))、queue 隊(duì)列處理 三大核心能力。
另外,在代碼中可看到 K8s 實(shí)現(xiàn)了三種 RateLimiter:BucketRateLimiter, ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter,Controller 默認(rèn)采用了前兩種如下:
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
這樣,在用戶側(cè)可以通過(guò)調(diào)用 workqueue 相關(guān)方法進(jìn)行靈活的隊(duì)列處理,比如失敗多少次就不再重試,失敗了延時(shí)入隊(duì)的時(shí)間控制,隊(duì)列的限速控制(QPS)等,實(shí)現(xiàn)非阻塞異步化邏輯處理。
10. 小結(jié)
本文通過(guò)分析 K8s 中 Reflector(反射器)、DeletaFIFO(增量隊(duì)列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享資源通知器)、processorListener(事件監(jiān)聽處理器)、workqueue(事件處理工作隊(duì)列) 等組件,對(duì) Informer 實(shí)現(xiàn)機(jī)制進(jìn)行了解析,通過(guò)源碼、圖文方式說(shuō)明了相關(guān)流程處理,以期更好的理解 K8s Informer 運(yùn)行流程。
可以看到,K8s 為了實(shí)現(xiàn)高效、非阻塞的核心流程,大量采用了 goroutine 協(xié)程、channel 通道、queue 隊(duì)列、index 索引、map 去重等方式;并通過(guò)良好的接口設(shè)計(jì)模式,給使用者開放了很多擴(kuò)展能力;采用了統(tǒng)一的接口與實(shí)現(xiàn)的命名方式等,這些都值得深入學(xué)習(xí)與借鑒。
PS: 更多內(nèi)容請(qǐng)關(guān)注 k8s-club