kubernetes 中的事件机制

我们通过 kubectl describe [资源] 命令,可以在看到 Event 输出,并且经常依赖 event 进行问题定位,从 event 中可以分析整个 POD 的运行轨迹,为服务的客观测性提供数据来源,由此可见,event 在 Kubernetes 中起着举足轻重的作用。



event展示



event 并不只是 kubelet 中都有的,关于 event 的操作被封装在client-go/tools/record包,我们完全可以在写入自定义的 event。
https://gocn.vip/topics/9965

Event 定义
其实 event 也是一个资源对象,并且通过 apiserver 将 event 存储在 etcd 中,所以我们也可以通过 kubectl get event 命令查看对应的 event 对象。



以下是一个 event 的 yaml 文件:



apiVersion: v1
count: 1
eventTime: null
firstTimestamp: “2020-03-02T13:08:22Z”
involvedObject:
apiVersion: v1
kind: Pod
name: example-foo-d75d8587c-xsf64
namespace: default
resourceVersion: “429837”
uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4
kind: Event
lastTimestamp: “2020-03-02T13:08:22Z”
message: Pod sandbox changed, it will be killed and re-created.
metadata:
creationTimestamp: “2020-03-02T13:08:30Z”
name: example-foo-d75d8587c-xsf64.15f87ea1df862b64
namespace: default
resourceVersion: “479466”
selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64
uid: 9fe6f72a-341d-4c49-960b-e185982d331a
reason: SandboxChanged
reportingComponent: “”
reportingInstance: “”
source:
component: kubelet
host: minikube
type: Normal



主要字段说明:**



involvedObject: 触发 event 的资源类型
lastTimestamp:最后一次触发的时间
message:事件说明
metadata :event 的元信息,name,namespace 等
reason:event 的原因
source:上报事件的来源,比如 kubelet 中的某个节点
type:事件类型,Normal 或 Warning
event 字段定义可以看这里:types.go#L5078



接下来我们来看看,整个 event 是如何下入的。



写入事件
1、这里以 kubelet 为例,看看是如何进行事件写入的 2、文中代码以 Kubernetes 1.17.3 为例进行分析
先以一幅图来看下整个的处理流程
event处理过程



创建操作事件的客户端:
kubelet/app/server.go#L461



// makeEventRecorder sets up kubeDeps.Recorder if it’s nil. It’s a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
if kubeDeps.Recorder != nil {
return
}
//事件广播
eventBroadcaster := record.NewBroadcaster()
//创建EventRecorder
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
//发送event至log输出
eventBroadcaster.StartLogging(klog.V(3).Infof)
if kubeDeps.EventClient != nil {
klog.V(4).Infof(“Sending events to api server.”)
//发送event至apiserver
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events(“”)})
} else {
klog.Warning(“No api server defined - no events will be sent to API server.”)
}
}
通过 makeEventRecorder 创建了 EventRecorder 实例,这是一个事件广播器,通过它提供了 StartLogging 和 StartRecordingToSink 两个事件处理函数,分别将 event 发送给 log 和 apiserver。
NewRecorder创建了 EventRecorder 的实例,它提供了 Event ,Eventf 等方法供事件记录。



EventBroadcaster
我们来看下 EventBroadcaster 接口定义:event.go#L113



// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
//
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
StartRecordingToSink(sink EventSink) watch.Interface
StartLogging(logf func(format string, args …interface{})) watch.Interface
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder



Shutdown() } 具体实现是通过 eventBroadcasterImpl struct 来实现了各个方法。


其中 StartLogging 和 StartRecordingToSink 其实就是完成了对事件的消费,EventRecorder 实现对事件的写入,中间通过 channel 实现了生产者消费者模型。



EventRecorder
我们先来看下EventRecorder 接口定义:event.go#L88,提供了一下 4 个方法



// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// ‘object’ is the object this event is about. Event will make a reference– or you may also
// pass a reference to the object directly.
// ‘type’ of this event, and can be one of Normal, Warning. New types could be added in future
// ‘reason’ is the reason this event is generated. ‘reason’ should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). “reason” will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// ‘message’ is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, eventtype, reason, message string)



// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) }


主要参数说明:



object 对应 event 资源定义中的 involvedObject
eventtype 对应 event 资源定义中的 type,可选 Normal,Warning.
reason :事件原因
message :事件消息
我们来看下当我们调用 Event(object runtime.Object, eventtype, reason, message string) 的整个过程。
发现最终都调用到了 generateEvent 方法:event.go#L316



func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
…..
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
最终事件在一个 goroutine 中通过调用 recorder.Action 进入处理,这里保证了每次调用 event 方法都是非阻塞的。
其中 makeEvent 的作用主要是构造了一个 event 对象,事件 name 根据 InvolvedObject 中的 name 加上时间戳生成:



注意看:对于一些非 namespace 资源产生的 event,event 的 namespace 是 default
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := ref.Namespace
if namespace == “” {
namespace = metav1.NamespaceDefault
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(“%v.%x”, ref.Name, t.UnixNano()),
Namespace: namespace,
Annotations: annotations,
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
}
}
进一步跟踪Action方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23



// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
将 event 写入到了一个 channel 里面。
注意:
这个 Action 方式是apimachinery包中的方法,因为实现的 sturt recorderImpl
将 *watch.Broadcaster 作为一个匿名 struct,并且在 NewRecorder 进行 Broadcaster 赋值,这个Broadcaster其实就是 eventBroadcasterImpl 中的Broadcaster。



到此,基本清楚了 event 最终被写入到了 Broadcaster 中的 incoming channel 中,下面看下是怎么进行消费的。



消费事件
在 makeEventRecorder 调用的 StartLogging 和 StartRecordingToSink 其实就是完成了对事件的消费。



StartLogging直接将 event 输出到日志
StartRecordingToSink将事件写入到 apiserver
两个方法内部都调用了 StartEventWatcher 方法,并且传入一个 eventHandler 方法对 event 进行处理



func (e eventBroadcasterImpl) StartEventWatcher(eventHandler func(v1.Event)) watch.Interface {
watcher := e.Watch()
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there’s no reason this should
// ever happen.
continue
}
eventHandler(event)
}
}()
return watcher
}
其中 watcher.ResultChan 方法就拿到了事件,这里是在一个 goroutine 中通过func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event) 方法调用将 event 又写入了broadcasterWatcher.result



主要看下 StartRecordingToSink 提供的的eventHandler, recordToSink 方法:



func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf(“Unable to write event ‘%#v’ (retry limit exceeded!)”, event)
break
}
// Randomize the first sleep so that various clients won’t all be
// synced up if the master goes down.
// 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
其中 event 被经过了一个 eventCorrelator.EventCorrelate(event) 方法做预处理,主要是聚合相同的事件(避免产生的事件过多,增加 etcd 和 apiserver 的压力,也会导致查看 pod 事件很不清晰)



下面一个 for 循环就是在进行重试,最大重试次数是 12 次,调用 recordEvent 方法才真正将 event 写入到了 apiserver。



事件处理
我们来看下EventCorrelate方法:



// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c EventCorrelator) EventCorrelate(newEvent *v1.Event) (EventCorrelateResult, error) {
if newEvent == nil {
return nil, fmt.Errorf(“event is nil”)
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(observedEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}



分别调用了 aggregator.EventAggregate ,logger.eventObserve , filterFunc 三个方法,分别作用是:



aggregator.EventAggregate:聚合 event,如果在最近 10 分钟出现过 10 个相似的事件(除了 message 和时间戳之外其他关键字段都相同的事件),aggregator 会把它们的 message 设置为 (combined from similar events)+event.Message
logger.eventObserve:它会把相同的事件以及包含 aggregator 被聚合了的相似的事件,通过增加 Count 字段来记录事件发生了多少次。
filterFunc: 这里实现了一个基于令牌桶的限流算法,如果超过设定的速率则丢弃,保证了 apiserver 的安全。
我们主要来看下aggregator.EventAggregate方法:



func (e EventAggregator) EventAggregate(newEvent *v1.Event) (v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
var record aggregateRecord
// eventKey is the full cache key for this event
//eventKey 是将除了时间戳外所有字段结合在一起
eventKey := getEventKey(newEvent)
// aggregateKey is for the aggregate event, if one is needed.
//aggregateKey 是除了message和时间戳外的字段结合在一起,localKey 是message
aggregateKey, localKey := e.keyFunc(newEvent)



// Do we have a record of similar events in our cache?
e.Lock()
defer e.Unlock()
//从cache中根据aggregateKey查询是否存在,如果是相同或者相类似的事件会被放入cache中
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}

//判断上次事件产生的时间是否超过10分钟,如何操作则重新生成一个localKeys集合(集合中存放message)
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}

// Write the new event into the aggregation record and put it on the cache
//将locakKey也就是message放入集合中,如果message相同就是覆盖了
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)

// If we are not yet over the threshold for unique events, don't correlate them
//判断localKeys集合中存放的类似事件是否超过10个,
if uint(record.localKeys.Len()) < e.maxEvents {
return newEvent, eventKey
}

// do not grow our local key set any larger than max
record.localKeys.PopAny()

// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
Namespace: newEvent.Namespace,
},
Count: 1,
FirstTimestamp: now,
InvolvedObject: newEvent.InvolvedObject,
LastTimestamp: now,
//这里会对message加个前缀:(combined from similar events):
Message: e.messageFunc(newEvent),
Type: newEvent.Type,
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, aggregateKey } aggregator.EventAggregate方法中其实就是判断了通过 cache 和 localKeys 判断事件是否相似,如果最近 10 分钟出现过 10 个相似的事件就合并并加上前缀,后续通过logger.eventObserve方法进行 count 累加,如果 message 也相同,肯定就是直接 count++。


总结
好了,event 处理的整个流程基本就是这样,我们可以概括一下,可以结合文中的图对比一起看下:



创建 EventRecorder 对象,通过其提供的 Event 等方法,创建好 event 对象
将创建出来的对象发送给 EventBroadcaster 中的 channel 中
EventBroadcaster 通过后台运行的 goroutine,从管道中取出事件,并广播给提前注册好的 handler 处理
当输出 log 的 handler 收到事件就直接打印事件
当 EventSink handler 收到处理事件就通过预处理之后将事件发送给 apiserver
其中预处理包含三个动作,1、限流 2、聚合 3、计数
apiserver 收到事件处理之后就存储在 etcd 中
回顾 event 的整个流程,可以看到 event 并不是保证 100% 事件写入(从预处理的过程来看),这样做是为了后端服务 etcd 的可用性,因为 event 事件在整个集群中产生是非常频繁的,尤其在服务不稳定的时候,而相比 Deployment,Pod 等其他资源,又没那么的重要。所以这里做了个取舍。



参考文档:



https://cizixs.com/2017/06/22/kubelet-source-code-analysis-part4-event/


Category k8s