Posted on 

cloud provider Service Controller

​ service controller 负责观察 k8s 中 service 资源的创建,更新和删除事件。并基于当前 k8s 中的 service 状态去云上配置负载均衡,保证云上的负载均与 serivce 资源描述相一致。

​ service controller 在 cloud contorller 中的一个模块随 cloud controller 启动,可以通过启动 new service controller 参数可以观测到,service controller 是通过观察 service 和 node 资源来工作的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Start the service controller
serviceController, err := servicecontroller.New(
cloud,
client("service-controller"),
c.SharedInformers.Core().V1().Services(),
c.SharedInformers.Core().V1().Nodes(),
c.ComponentConfig.KubeCloudShared.ClusterName,
)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
go serviceController.Run(stop, int(c.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
}

​ new 函数的实现如下,核心是流程是通过 list/watch 机制来观测 service 的 event,然后触发 enqueue 的函数,再通过 sync woker 从 wrok queue 中取出 item 处理云上 lb 的绑定逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// New returns a new service controller to keep cloud provider service resources
// (like load balancers) in sync with the registry.
func New(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
clusterName string,
) (*ServiceController, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})

if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}

s := &ServiceController{
cloud: cloud,
knownHosts: []*v1.Node{},
kubeClient: kubeClient,
clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
}

serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
s.enqueueService(cur)
}
},
DeleteFunc: s.enqueueService,
},
serviceSyncPeriod,
)
s.serviceLister = serviceInformer.Lister()
s.serviceListerSynced = serviceInformer.Informer().HasSynced

if err := s.init(); err != nil {
return nil, err
}
return s, nil
}

​ service 通过形如 namespace+serivce 名字的 key 放入 work queue,而 syncService 函数根据 key 取出 service 进行实际的处理操作,如果操作完成过后从 work queue 中调用 queue.done(key) 移除掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *ServiceController) processNextWorkItem() bool {
key, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(key)

err := s.syncService(key.(string))
if err == nil {
s.queue.Forget(key)
return true
}

runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
s.queue.AddRateLimited(key)
return true
}

​ 之所以 service 要一个额外的 work queue 有原因的,其一是因为云上 lb 的实际绑定解绑操作相对于单纯的 serivce 声明要慢很多,其二是当 service 从 k8s 中删除的时候就真的被从 etcd 中移除了,这个时候从缓存里面找个删除对应公网 lb 的关键参数。

​ 这个才是 service controller 的核心逻辑,这里会确认 service 是删除还是更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (s *ServiceController) syncService(key string) error {
startTime := time.Now()
var cachedService *cachedService
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

// service holds the latest service info from apiserver
service, err := s.serviceLister.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v. Attempting to cleanup load balancer resources", key)
err = s.processServiceDeletion(key)
case err != nil:
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
default:
cachedService = s.cache.getOrCreate(key)
err = s.processServiceUpdate(cachedService, service, key)
}

return err
}

​ 看一下处理 service update 的核心逻辑,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// processServiceUpdate operates loadbalancers for the incoming service accordingly.
// Returns an error if processing the service update failed.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) error {
if cachedService.state != nil {
// 如果是同名字但是 UID 不一样
// 会被确认是不同的 serivice 则这个 lb 会被删除。
if cachedService.state.UID != service.UID {
err := s.processLoadBalancerDelete(cachedService, key)
if err != nil {
return err
}
}
}
// cache the service, we need the info for service deletion
cachedService.state = service
err := s.createLoadBalancerIfNeeded(key, service)
if err != nil {
eventType := "CreatingLoadBalancerFailed"
message := "Error creating load balancer (will retry): "
if !wantsLoadBalancer(service) {
eventType = "CleanupLoadBalancerFailed"
message = "Error cleaning up load balancer (will retry): "
}
message += err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, eventType, message)
return err
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
// processed it, a cached service being nil implies that it hasn't yet
// been successfully processed.
s.cache.set(key, cachedService)

return nil
}

​ 根据实际情况判断是更新,还是创建云上 lb 资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// createLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
// i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
// doesn't want a loadbalancer no more. Returns whatever error occurred.
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) error {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.

// Save the state so we can avoid a write if it doesn't change
previousState := v1helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
var newState *v1.LoadBalancerStatus
var err error
// 针对 Type 变更,要做云的 lb 清理
if !wantsLoadBalancer(service) {
_, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service)
if err != nil {
return fmt.Errorf("error getting LB for service %s: %v", key, err)
}
if exists {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
}

newState = &v1.LoadBalancerStatus{}
} else {
glog.V(2).Infof("Ensuring LB for service %s", key)

// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart

s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
// 这个地方是 service 更新更新的核心逻辑
newState, err = s.ensureLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
}

// Write the state if changed
// TODO: Be careful here ... what if there were other changes to the service?
if !v1helper.LoadBalancerStatusEqual(previousState, newState) {
// Make a copy so we don't mutate the shared informer cache
service = service.DeepCopy()

// Update the status on the copy
service.Status.LoadBalancer = *newState

if err := s.persistUpdate(service); err != nil {
// TODO: This logic needs to be revisited. We might want to retry on all the errors, not just conflicts.
if errors.IsConflict(err) {
return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", service.Namespace, service.Name, err)
}
runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err))
return nil
}
} else {
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
}

return nil
}
1
2
3
4
5
func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
...
// 基本就是调用这个函数
return s.balancer.EnsureLoadBalancer(context.TODO(), s.clusterName, service, nodes)
}

​ 下面是抽象给 cloud provider 实现的接口,由 serivce controller 来统一调用,EnsureLoadBalancer 是最核心的函数,一般云厂商的实现方式就是将他们的公网 LB 产品和 k8s 的 LoadBalancer type 的 service 结合起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// LoadBalancer is an abstract, pluggable interface for load balancers.
type LoadBalancer interface {
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
// GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is.
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
// GetLoadBalancerName returns the name of the load balancer. Implementations must treat the
// *v1.Service parameter as read-only and not modify it.
GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer.
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted.
// This construction is useful because many cloud providers' load balancers
// have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around.
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error
}

​ 如果客户在 k8s 中创建 LoadBalancer type 的 service,cloud proivder 的 EnsureLoadBalancer 常见实现方式是,调用云上 LB 相关接口将 LB 及其必要的依赖资源创建出来,其 LB 对应的后端是 serivce 所属的集群内的 k8s node全部节点,部分节点也可以的原因是因为收到流量的部分节点会通过 kube-proxy 的规则将流量二次转发具体细节
​ 当 lb 创建完成,后端绑定成功后,客户就可以通过访问公网类型的 LB 的 VIP 来访问 Pod 中的业务了。这个时候流量是先到云厂商的公网网关,然后流量通过 LB 到云厂商提供给 k8s 的 node 上,最后再由 kube-proxy 通过 watch endpoint 产生的转发规则讲流量运到 pod 中。