k8s cache tips

在程序中经常需要用到内存缓存,说简单一点就是一个map。这里以k8s中的canche为例看看好的缓存机制是如何设计的以及有哪些需要注意的地方,以后在程序中遇到类似的缓存相关的问题就可以直接拿过来使用。这部分的介绍不需要对k8s的背景知识有任何的了解,但是了解了缓存机制之后再看一些相关的组件比如kube-controller,多少会更有一些更加全局的认识对于k8s自身业务代码的深入理解也会有所帮助,甚至可以按照自己的业务逻辑和场景实现一个定制的controller。

采用的分析k8s的代码版本是v1.6.2,所分析的文件位于.vendor/k8s.io/client-go/tools/cache,其实具体功能上都是比较好理解的,关键是对于一些细节部分的把握。

threadSafetyStore

顾名思义,这个store就是实现了一个线程安全的strore,直接使用这个store中提供的Add,Update,Delete方法操纵对象,就不需要再自己考虑各种加锁的问题了。

再分析具体实现之前,先简单看下测试文件store_test.go,看下这个cache是如何使用的,threadSafetyStore没有直接暴露出来,它和一个keyFunc组合起来被封装在cache结构体中。

1
2
3
4
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}

具体的cache结构包含两部分,一个是ThreadSafeStore,其中的主要内容是一个map,具体稍后介绍,另一个是KeyFunc,用于将Name转化成为可以进行index的值,出于扩展性的考虑需要将这个函数单独抽象出来。

比如最基本的KeyFunc可以是以下的形式:

1
2
3
4
5
6
7
type testStoreObject struct {
id string
val string
}
func testStoreKeyFunc(obj interface{}) (string, error) {
return obj.(testStoreObject).id, nil
}

初始化的时候可以将threadSafeMap中的index设置为空,直接通过KeyFunc从原始的object得到一个index,就是提取出用于存储的对象的id值将其作为index value。

1
2
3
4
5
6
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}

之后具体的Add,Update,Delete,List,Replace等等方法都比较直接,一看就明白了,具体使用的时候如果有不清楚的地方可以根据测试文件中的操作来熟悉相关使用。最简单的情况下就是在生成Store实例的时候把Indexers{}, Indices{}都设置为空实现即可。

threadsafetycache 具体实现

这个cache为了保证执行时候的线程安全,在具体进行添加删除操作的时候都加上了lock。struct中主要包含的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc
// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Indices maps a name to an Index
type Indices map[string]Index
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

lock与存放items的map在这里都比较好理解,关键是后面两个元素,这两个元素体现了工程性与扩展性。indexers是一个name到IndexFunc的映射,IndexFunc的功能是将一个name变换成一个可以作为index的value,这个函数可以自己定义。另外一个map存储的是name到Index的映射,具体的Index对象又是一个map,其值是一个set,因为一个可以用于index的value可能对应的是多个key,不过实际使用的时候常产跨过这个中间层,一个object通过IndexFunc对应到一个Key元素。

比如在每次Add一个元素的时候都需要添加一个updateIndices的操作,这个操作就是用来更新最后一个indices元素,所有的用于index的函数都保存在这个结构体中,之后每次做更新操作的时候都会用到这些函数。

FIFO

首先明确下FIFO cache的基本功能

  • 具备对象的CRUD操作
  • 按照FIFO的方式存储数据
  • 在cache中的内容发生变化的时候可以调用回调函数
    使用队列进行操作的场景都可以通过FIFO cache来进行处理,FIFO cache工作起来就类似于一个队列。

这里采用的基本结构是map+queue,相比于上一种simplecache,这里增加了queue用于存储index,每次进出cache的元素都是从index中进行检索而不是直接从map中进行检索。具体的回调函数的实现通过sync.condition变量进行,这里有一个实际的例子。

补充sync.condition的使用,在只用了broadcaster的方法之后,会依次向所有执行了condition.wait的goroutine发广播,(这些goroutine本质上是阻塞在condition.Wati所在的位置)将它们启动起来。sync.condition.Wait会将当前的goroutine挂起,知道执行了sync.condition.Broadcast方法之后,所有的相关的被挂起的goroutine就会重新被调度。

具体的FIFO的代码仍然参考的k8s中的实现,基本的CRUD就不再赘述,直接参考这里(https://github.com/wangzhezhe/BLG/blob/develop/Components/cache/fifo.go)的源码,这里主要分析下回调操作的实现。

注意pop方法的使用,这个也是唯一多增加的地方,其实就是把queue的头元素拿出来,唯一不容易理解的地方就是sync.condition的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, FIFOClosedError
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}

可以看到,在队列中没有元素的时候,就执行f.cond.Wait()方法,将这个goroutine一直挂起,直到新的元素加入了队列中使得条件符合,比如ADD了一个元素进入队列,这个时候就通过cond.Broadcast方法通知所有被cond.Wait挂起的goroutine,最后从队列中取出第一个元素然后执行传入进来的process函数。整个过程被放在一个大的for循环中,以上的逻辑一直中复执行,还有一点要注意的是,每次Pop的逻辑执行完成之后都要将被处理过的元素从当前的队列中删除,相当于说是当前的元素已经被处理过了,可以从队列中移出了,由于存储的key值仅仅是string类型,这里的queue只需要通过[]string来模拟即可。

LRU

在实际程序中最有效的缓存应该算是 LRU (least recently used)类型的缓存了,因为申请的map的空间的大小毕竟是有限的,应该存储哪些在最近一段使用比较频繁的缓存,这样才能提高缓存的效率。这部分代码主要参考的是这个库。LRU cache实现的时候需要用到链表的结构,如果是已经存在的元素就将旧的元素调整到链表的第一个位置,具体的实现思路可以参考FIFO的结构,存储数据还是用map来进行,检索的操作通过List来进行,List中只存储元素的id,而不是完整的信息。

UndeltaStore

原先store的变种,每次状态变之后,比如ADD,DELETE,UPDATE方法被执行之后,就调用PushFunc,对缓存中剩下的所有的元素都执行一次pushfunc,比如对于Add操作:

1
2
3
4
5
6
7
func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}

在执行完成之后得到object list然后将其输入到PushFunc中,其他的操作类似。

在k8s的cahce中还提供了delta_FIFO类型的cache,同时提供了以上介绍的delta功能和类似于Queue一样的FIFO的操作。

Listwatch

list watch到底实现了什么功能?

1
2
3
4
5
6
7
8
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}

其中metav1.ListOptions 包含了对象的一些元信息比如selector,resourceversion,timeout等标记,这些是k8s中用于进行select操作的通用的信息。
lw 的能力包括list以及watch特定的资源。list资源比较容易理解,对于一个catch来说,把其中所有的元素都列出来就是实现了list的功能,那watch具体是什么?首先看一个测试函数中使用的list watch(cache/testing/fake_controller_source.go):

list方法是模仿 http get 的操作,将fakeobject 缓存中的元素取出来组合成一个list返回,返回的list中,元素的类型是runtime.Object,这个interface所暴露出来的方法是返回元素注册进来的Kind:

1
2
3
type Object interface {
GetObjectKind() schema.ObjectKind
}

其返回值schema.ObjectKind也是一个interface类型,具体有以下能力:

1
2
3
4
5
6
7
8
9
10
11
12
13
type ObjectKind interface {
// SetGroupVersionKind sets or clears the intended serialized kind of an object. Passing kind nil
// should clear the current setting.
SetGroupVersionKind(kind GroupVersionKind)
// GroupVersionKind returns the stored group, version, and kind of an object, or nil if the object does
// not expose or provide these fields.
GroupVersionKind() GroupVersionKind
}
type GroupVersionKind struct {
Group string
Version string
Kind string
}

ObjectKind的能力包括:

  • 第一个能力就是将传入进来的Kind以及GroupVerionKind参数set到具体的实现对象中。
  • 第二个能力是得到当前的object的标识符,具体指的是某个api所在Group,Version以及Kind,从这里也能看出k8s中api对象的层级结构。
    list object 本身来说也实现了这个接口。在以前的k8s代码版本中,ObjectKind:
    每个api对象在相对应的register文件中都是按照以下的方式实现这个接口
1
(func *obj) Cluster() GetObjectKind.schema     { return &obj.TypeMeta }

其中的obj.TypeMeta就是每个api对象的元信息,具体包括 Kind 以及 APIVersion信息,他实现了上面所说的ObjectKind接口也就是说实现了SetGroupVersionKind以及GroupVersionKind这两个方法。

watch 功能主要是通过暴露出来的watch interface来体现:

1
2
3
4
5
6
type Interface interface {
//停止watch操作
Stop()
//返回一个chanel用于接受所有的event,这里的events值的就是watch到的events
ResultChan() <-chan Event
}

对于watch来说,具体的实现并不像是list那么简单,起真正的底层实现通过interface接口暴露出来。具体的实现细节是在 apimachinery/pkg/watch 中描述,具体来看下watch机制,具体源文件在/apimachinery/pkg/watch中:

默认的watch的事件类型包括"ADDED" "MODIFIED" "DELETED" "ERROR"几种。默认的用于消息传递的event结构体:

1
2
3
4
5
6
7
8
9
type Event struct {
Type EventType
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.Object
}

newEmptyWatch函数的功能就是创建一个channel,channel中具体的元素为Event:

1
2
3
4
5
6
7
type emptyWatch chan Event

func NewEmptyWatch() Interface {
ch := make(chan Event)
close(ch)
return emptyWatch(ch)
}

emptywatch对于stop函数的操作直接返回nil,对于ResultChan的操作直接返回chan Event。
fakewatch在empty的基础上实现了一些辅助功能,可以向channel中写入不同的Event数据,具体包括Add,Modified,Delete等等。
RaceFreeFakeWatcher在fakewatch的基础上对每种行为都添加了lock。
本质上来讲最基本的watch就是对channel进行了一个封装,因为其接口暴露出来的两个方法就只有Stop()以及ResultChan() <-chan Event

Event mux

再深入一些看下mux.go的实现,这个部分的主要功能是将watch收到的Event分发到注册进来的多个watcher中,主要是实现了notification的一对多的功能。先大致看下broadcaster结构体中所包含的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type Broadcaster struct {
//这个lock主要控制对于watchers的map的读写操作
lock sync.Mutex
//这个watchers存储broadcasterwatcher相当于是notification的下游的各种watchers
watchers map[int64]*broadcasterWatcher
//记录新的即将注册进来的broadcasterWatcher的id
nextWatcher int64
//每次新加入或者减少一个watcher,就在waitgroup中进行相对应的操作
distributing sync.WaitGroup
//记录最上游的事件来源
incoming chan Event
// 生成一个新的broadcasterwatcher的时候 其中的缓冲channel的长度
watchQueueLength int
//控制watchers如何对notification做反应
//如果为0 如果某个watcher满了 incoming会阻塞在这里 直到下游的watcher channel变得可以接受新的元素为止
//如果为1 当某个watcher满了 incoming会跳过当前的watcher 直接将信息传递给下一个watcher 这样chanel满的那个watcher就会丢失一条Event信息
fullChannelBehavior FullChannelBehavior
}
//每一个broadcasterWatcher都用来处理一个单一的watcher行为
type broadcasterWatcher struct {
//按照之前的对于基本watch的介绍 result以及stopped chanel是基本的watcher元素
result chan Event
stopped chan struct{}
//sync.Once控制函数只被调用一次
stop sync.Once
//id用来记录这个broadcasterWatcher的身份标记
id int64
//这里是所反向引用的操作 记录这个broadcasterWatcher上游的Broadcaster
m *Broadcaster
}

在实际的系统中应该结合场景,比较细致地考虑watchQueueLength以及fullChannelBehavior,在NewBroadcaster操作的时候,这两个也是关键的输入参数。NewBroadcaster操作的时候会将waitgroup中的数字+1,这个loop标记的是有几个Broadcaster正在处于loop的状态,然开始loop循环:

1
2
3
4
5
6
7
8
9
10
11
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}

loop的主要内容就是for循环,从incoming的channel中检测传过来的Event再将Event distribute到注册进来的watcher中,这部分可以说是Broadcaster的核心逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
for {
event, ok := <-m.incoming
if !ok {
break
}
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}

有一点要注意,放在Event中的内容有可能是正常的起到通知作用的消息,也可能是functionFakeRuntimeObject封装成的Event:

1
2
3
func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind {
return schema.EmptyObjectKind
}

这里要注意functionFakeRuntimeObject所执行的function具体来说是一个创建并注册Event的操作(具体在后面的Wacher部分介绍)这样操作的原因是确保注册watcher的时间和Event的发布是有序进行的,也就是说watcher在注册了之后确保可以接收到后面的Event,需要确保一个watcher创建并且注册好之后才能distribute新的Event,所以执行distribute操作之前,必须要确保创建并注册watcher的操作全部完成,这里实现的比较巧妙,需要好好体会以下。

再来看下distribute操作的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}

利用channel的通信+内存共享机制可以很容易地分析清楚核心逻辑,将event依次写入每个watchers的channel,如果是非阻塞行为的话在select语句中会多一个default:的判断,也就是说当某个watchers的result channel被缓存占满的时候,select操作会直接跳过阻塞的部分去继续执行default相关操作,直接跳过。

另外一个重要的部分就是注册新的Watch的操作,这部分的操作就是注册一个新的broadcasterwatcher到Broadcaster中,当然新注册进来的watcher是无法接收到历史的event信息的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (m *Broadcaster) Watch() Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
result: make(chan Event, m.watchQueueLength),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
})
return w
}

主要的创建新的broadcasterWatcher以及添加到map的过程是放在一个匿名函数中然后传入m.blockQueue中被执行的。等下再看blockQueue的意义,匿名函数的主要功能是初始化broadcasterWatcher的参数,注意m.nextWatcher在最初的时候才用默认初始值为0,之后每次创建一个broadcasterWatcher就会增加1,这个相当于是broadcasterWatcher的一个index信息,具体的blockQueue的使用原因可以参考前面的loop循环操作的相关介绍。

watch部分还有一个操作是WatchWithPrefix,相比于之前的watcher,唯一的区别就是在创建了watcher之后,首先将参数中的queuedEvents []Event发送给这个watcher然后再将original的events依次发送给watcher。

以上就是watch部分的主要函数和功能,其他的简单的功能相关的函数就不赘述了,总之主要的流程就是从上游的incoming channel中取数据,然后按照固定的格式不断地分发给下游的watcher。

回到最初的FakeControllerSource实现List Watch的操作,List操作已经介绍过,就是将缓存中的元素copy出来返回。根据resourceversion与Event队列中元素的长度差别,决定才用WatchWithPrefix的操作(distribute Event操作事前,先将queuedEvents中所有的Event输入到这个watcher的channel中)或者是普通的Watch操作,直接注册Event。

reflector cache用到了watch package的内容

Reflector

reflector类型的cache的作用是watch特定类型的资源,并且将被watch对象的变化情况对应到自身所保持的store中,相当于是做了一层映射。在发现被watch对象发生变化的时候还可以执行一个函数。

具体的结构体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Reflector struct {
// reflector的名字 用于作为这个reflector的标识符
name string
// 希望存入到reflector中的对象的类型
expectedType reflect.Type
// 用于同步 watche source 的Store
store Store
// listerWatcher 封装了List Watch相关的接口,用于执行对应的操作
listerWatcher ListerWatcher
// period 控制一次watch结束和下一次watch开始的时间
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// 测试的时候可以通过clock手动调整时间
clock clock.Clock
// 记录上一次同步时候的版本号信息
lastSyncResourceVersion string
// lastSyncResourceVersionMutex 用于保证对lastSyncResourceVersion进行读写操作时候的线程安全
lastSyncResourceVersionMutex sync.RWMutex
}

如果expectedType不为空,reflector只存放expectedType所指定的类型的变化,如果为空,则所有watch到的元素的变化都会被放入store中。主要的函数包括:

1
2
3
4
5
6
7
8
func (r *Reflector) Run() {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() {
if err := r.ListAndWatch(wait.NeverStop); err != nil {
utilruntime.HandleError(err)
}
}, r.period, wait.NeverStop)
}

wait.Until是k8s实现的一些中间操作,用于再r.period时间周期内执行传如的func,这里的func主要的操作就是ListAndWatch。主要的逻辑就是这个ListAndWatch操作,这个函数比较长,这里仅仅列下函数中的主要逻辑:

  • 采用options := metav1.ListOptions{ResourceVersion: “0”}通过r.listerWatcher.List列出当前的所有对象,具体参考ResourceVersion的注释,之后从list中提取出resourceversion并更新reflector实例中的相关参数。之后启动一个goroutine负责select各种停止的channel。
  • 在一个循环中执行watch操作,对可能得到的error进行各种处理,然后通过watchHandler函数对得到的watch channel进行处理。
    具体来看下watchHandler的逻辑,它主要做了两个事情:

  • 判断watch的返回channel中接受到的object的类型是不是期望的类型,以及相关的错误处理,之后得到最新的resourceversion。然后分别判断Event的类型,是ADD,Modified,或是Delete,之后分别对不同的操作进行响应,在相关联的底层store中ADD,Modified或者Delete相对应的Object。 (watch 操作时候用于消息通知的Event结构体中就只有两个部分,EventType以及runtime.Object)

  • 更新resourceVersion并且做一些错误处理,比如watch channel异常关闭。
    才用reflector+deltaStore可以实现一旦store中的信息变化就执行指定函数的操作。

Controller

controller类型的cache首先抽象出了一个config用于初始化必要的信息,注释中对于每个参数的解释已经非常清晰了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects; either a FIFO or
// a DeltaFIFO. Your Process() function should accept
// the output of this Queue's Pop() method.
Queue
// Something that can list and watch your objects.
ListerWatcher
// Something that can process your objects.
Process ProcessFunc
// The type of your objects.
ObjectType runtime.Object
// Reprocess everything at least this often.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
FullResyncPeriod time.Duration
// ShouldResync, if specified, is invoked when the controller's reflector determines the next
// periodic sync should occur. If this returns true, it means the reflector should proceed with
// the resync.
ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter.
RetryOnError bool
}

最基本的生成controller的方法:

1
2
3
4
5
6
7
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}

按照之前的套路重点看下Run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
r.RunUntil(stopCh)
wait.Until(c.processLoop, time.Second, stopCh)
}

创建reflector之后每隔一秒钟调用processLoop方法,注意在创建reflector的时候第三个参数,也就是reflector的底层store就是这个Queue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

processLoop就是不断循环从Queue中pop出object然后传入PopProcessFunc中,具体的Pop的实现细节可以看之前的DeltaFIFO的cache。

Informer

informer在本质上来讲是将一个reflector以及一个DeltaFIFO组装起来,对上listwatch到source information的变化然后同步到reflector中,其中reflector的cache类型使用的是DeltaFIFO的类型,对下通过PushFunc将变化类型再解析出来调用提前注册好的不同类型(Add,Update,Delete,Modify)的handler函数。

理解了informer 之后再看k8s的 controller 以及 event 机制就会有豁然开朗的感觉。sharedinformer的主要功能是从list watch上游接口中获取信息,一旦发现上游数据变化就调用对应的函数进行处理。这些函数主要包括:

1
2
3
4
5
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

这个接口中的方法可以通过以下结构体来实现:

1
2
3
4
5
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}

这个结构体实现了OnAdd,OnUpdate以及OnDelete方法,其实就是在这些方法中调用对应注册进来的AddFunc,UpdateFunc以及DeleteFunc方法。

来看下informer struct中具体包含的内容(基本的informer与controller在一个文件中):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return clientState, New(cfg)
}

主要就是把对应的参数填入controller的config,之后生成新的controller,之后controller Run的时候,每次从队列中Pop一个元素出来,所执行的Process函数就是这里注册进来的。这个函数的功能主要是根据对象中的操作类型(这个Delta的结构于之前的Event比较类似)进行具体的细分,分别调用OnAdd,OnUpdate以及OnDelete方法。

相关参考

k8s相关代码
https://github.com/kubernetes/kubernetes/tree/336fb2f508a62514a7d3744c46543db816277c3a/staging/src/k8s.io/apimachinery/pkg/util/cache

golang sync condition
http://www.liguosong.com/2014/05/07/golang-sync-cond/

类似的介绍controller原理的文章
http://borismattijssen.github.io/articles/kubernetes-informers-controllers-reflectors-stores

推荐文章