杭州的網(wǎng)站建設(shè)公司有哪些百度搜索熱度指數(shù)
簡介
對于無狀態(tài)的組件來說,天然具備高可用特性,無非就是多開幾個副本而已;
而對于有狀態(tài)組件來說,實(shí)現(xiàn)高可用則要麻煩很多,一般來說通過選主來達(dá)到同一時刻只能有一個組件在處理業(yè)務(wù)邏輯。
在Kubernetes中,為了實(shí)現(xiàn)組件高可用,同一個組件需要部署多個副本,例如多個apiserver、scheduler、controller-manager等,其中apiserver是無狀態(tài)的,每個組件都可以工作,而scheduler與controller-manager是有狀態(tài)的,同一時刻只能存在一個活躍的,需要進(jìn)行選主。
Kubernetes中是通過leaderelection來實(shí)現(xiàn)組件的高可用的。在Kubernetes本身的組件中,kube-scheduler和kube-manager-controller兩個組件是有l(wèi)eader選舉的,這個選舉機(jī)制是Kubernetes對于這兩個組件的高可用保障。即正常情況下kube-scheduler或kube-manager-controller組件的多個副本只有一個是處于業(yè)務(wù)邏輯運(yùn)行狀態(tài),其它副本則不斷的嘗試去獲取鎖,去競爭leader,直到自己成為leader。如果正在運(yùn)行的leader因某種原因?qū)е庐?dāng)前進(jìn)程退出,或者鎖丟失,則由其它副本去競爭新的leader,獲取leader繼而執(zhí)行業(yè)務(wù)邏輯。
在Kubernetes client-go包中就提供了接口供用戶使用。代碼路徑在client-go/tools/leaderelection下。
如何使用
因?yàn)閏lient-go幫我們封裝了大部分邏輯,使用起來非常簡單
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,"namespace",lockName,ctx.KubeClient.CoreV1(),resourcelock.ResourceLockConfig{Identity: id,EventRecorder: ctx.Recorder("namespace"),})
if?err != nil?{log.Fatalf("error creating lock: %v", err)panic(err)
}// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(context.Background(), leaderelection.LeaderElectionConfig{Lock: rl,LeaseDuration: ctx.LeaseDuration,RenewDeadline: ctx.RenewDeadline,RetryPeriod: ctx.RetryPeriod,Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: func(_ context.Context)?{log.Infof("cmdb running in leader elect")run(ctx)},OnStoppedLeading: func()?{log.Fatalf("leaderelection lost")},},
})
首先是新建給資源鎖對象
開始選舉,同時業(yè)務(wù)會提供回調(diào)方法給leaderelection:針對不同的選舉結(jié)果,leaderelection會回調(diào)業(yè)務(wù)相應(yīng)的方法,有三種回調(diào)方法
-
成為leader時的回調(diào)
失去leader時的回調(diào)
leader變更時的回調(diào)
下面就從源碼來分析下它是怎么實(shí)現(xiàn)的
源碼解析
leaderelection基本原理其實(shí)就是利用通過Kubernetes中configmap 、endpoints資源實(shí)現(xiàn)一個分布式鎖,獲取到鎖的進(jìn)程成為leader,并且定期更新租約(renew)。其他進(jìn)程也在不斷的嘗試進(jìn)行搶占,搶占不到則繼續(xù)等待下次循環(huán)。當(dāng)leader節(jié)點(diǎn)掛掉之后,租約到期,其他節(jié)點(diǎn)就成為新的leader。
為了針對不同資源的鎖機(jī)制,leaderelection
定義了一個接口協(xié)議:
type?Interface interface?{// Get returns the LeaderElectionRecordGet() (*LeaderElectionRecord, error)// Create attempts to create a LeaderElectionRecordCreate(ler LeaderElectionRecord) error// Update will update and existing LeaderElectionRecordUpdate(ler LeaderElectionRecord) error// RecordEvent is used to record eventsRecordEvent(string)// Identity will return the locks IdentityIdentity() string// Describe is used to convert details on current resource lock// into a stringDescribe() string
}
目前有configmap和endpoints兩種資源的實(shí)現(xiàn)。
從上面的使用來說:
第一步,我們首先會新建一個資源鎖,對應(yīng)源碼如下:
func?New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig)?(Interface, error)?{switch?lockType {case?EndpointsResourceLock:return?&EndpointsLock{EndpointsMeta: metav1.ObjectMeta{Namespace: ns,Name: name,},Client: client,LockConfig: rlc,}, nilcase?ConfigMapsResourceLock:return?&ConfigMapLock{ConfigMapMeta: metav1.ObjectMeta{Namespace: ns,Name: name,},Client: client,LockConfig: rlc,}, nildefault:return?nil, fmt.Errorf("Invalid lock-type %s", lockType)}
}
可以看到返回的是接口類型,對應(yīng)到configmap和endpoints兩種鎖的實(shí)現(xiàn)。
第二步,開始選舉,對應(yīng)源碼如下:
func?RunOrDie(ctx context.Context, lec LeaderElectionConfig)?{le, err := NewLeaderElector(lec)if?err != nil?{panic(err)}if?lec.WatchDog != nil?{lec.WatchDog.SetLeaderElection(le)}le.Run(ctx)
}
func?(le *LeaderElector)?Run(ctx context.Context)?{defer?func()?{runtime.HandleCrash()le.config.Callbacks.OnStoppedLeading()}()if?!le.acquire(ctx) {return?// ctx signalled done}ctx, cancel := context.WithCancel(ctx)defer?cancel()go?le.config.Callbacks.OnStartedLeading(ctx)le.renew(ctx)
}
上面的代碼就是嘗試去獲取鎖,然后根據(jù)處理結(jié)果,回調(diào)業(yè)務(wù)相應(yīng)的方法,下面來看下具體的選舉邏輯:
func?(le *LeaderElector)?acquire(ctx context.Context)?bool?{ctx, cancel := context.WithCancel(ctx)defer?cancel()succeeded := falsedesc := le.config.Lock.Describe()klog.Infof("attempting to acquire leader lease %v...", desc)wait.JitterUntil(func()?{succeeded = le.tryAcquireOrRenew()le.maybeReportTransition()if?!succeeded {klog.V(4).Infof("failed to acquire lease %v", desc)return}le.config.Lock.RecordEvent("became leader")klog.Infof("successfully acquired lease %v", desc)cancel()}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())return?succeeded
}
wait是一個周期性任務(wù),該任務(wù)主要就是如下邏輯:
嘗試去獲取鎖
如果沒有獲取到,則直接返回,等待下一次周期的到來再次嘗試獲取
如果獲取到,表示自己已經(jīng)是leader,可以處理業(yè)務(wù)邏輯了,此時就通過取消context來退出wait周期性任務(wù)
下面來看下具體的獲取鎖代碼:
func?(le *LeaderElector)?tryAcquireOrRenew()?bool?{now := metav1.Now()leaderElectionRecord := rl.LeaderElectionRecord{HolderIdentity: le.config.Lock.Identity(),LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),RenewTime: now,AcquireTime: now,}// 1. obtain or create the ElectionRecordoldLeaderElectionRecord, err := le.config.Lock.Get()if?err != nil?{if?!errors.IsNotFound(err) {klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)return?false}if?err = le.config.Lock.Create(leaderElectionRecord); err != nil?{klog.Errorf("error initially creating leader election record: %v", err)return?false}le.observedRecord = leaderElectionRecordle.observedTime = le.clock.Now()return?true}// 2. Record obtained, check the Identity & Timeif?!reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {le.observedRecord = *oldLeaderElectionRecordle.observedTime = le.clock.Now()}if?le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&!le.IsLeader() {klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)return?false}// 3. We're going to try to update. The leaderElectionRecord is set to it's default// here. Let's correct it before updating.if?le.IsLeader() {leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTimeleaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions} else?{leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1}// update the lock itselfif?err = le.config.Lock.Update(leaderElectionRecord); err != nil?{klog.Errorf("Failed to update lock: %v", err)return?false}le.observedRecord = leaderElectionRecordle.observedTime = le.clock.Now()return?true
}
上面的邏輯如下:
獲取鎖對象信息LeaderElectionRecord
如果沒有獲取到,說明此時還沒有l(wèi)eader,則去創(chuàng)建一個
如果獲取到了,則說明系統(tǒng)中存在leader了,根據(jù)id標(biāo)識檢查自己是否是leader;如果自己不是leader,檢查鎖是否過期,如果沒有過期,則退出后面的邏輯,等待wait的下一次調(diào)用;如果過期了,則更新鎖對象,自己成為leader(續(xù)約成功或者新成為)
上面對對象鎖信息的處理調(diào)用了一系列的接口,根據(jù)前面說的,有兩種實(shí)現(xiàn)configmap和endpoints,下面以endpoints為例看看接口都是怎么處理對象鎖的:
func?(el *EndpointsLock)?Get()?(*LeaderElectionRecord, error)?{var?record LeaderElectionRecordvar?err errorel.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{})if?err != nil?{return?nil, err}if?el.e.Annotations == nil?{el.e.Annotations = make(map[string]string)}if?recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found {if?err := json.Unmarshal([]byte(recordBytes), &record); err != nil?{return?nil, err}}return?&record, nil
}// Create attempts to create a LeaderElectionRecord annotation
func?(el *EndpointsLock)?Create(ler LeaderElectionRecord)?error?{recordBytes, err := json.Marshal(ler)if?err != nil?{return?err}el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: el.EndpointsMeta.Name,Namespace: el.EndpointsMeta.Namespace,Annotations: map[string]string{LeaderElectionRecordAnnotationKey: string(recordBytes),},},})return?err
}
從上面可以看出,其實(shí)所謂的提供接口,無非就是提供一個鎖對象信息的存儲位置,因?yàn)閗8s中的資源都是給業(yè)務(wù)使用的,k8s不想專門提供一種資源來代表組件鎖,而是將鎖信息存儲在了接口實(shí)現(xiàn)資源的annotations標(biāo)簽中。