前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >名字服务Polaris中服务发现详解

名字服务Polaris中服务发现详解

作者头像
tunsuy
发布2023-08-19 09:46:36
2720
发布2023-08-19 09:46:36
举报

源码地址:https://github.com/polarismesh/polaris-controller/blob/main/README-zh.md

通过mesh配置文件设置controller的配置管理对象 https://fankangbest.github.io/2017/10/12/kubernetes-client%E5%88%86%E6%9E%90(%E4%B8%80)-kubeconfig-v1-5-2/

下面就从源码开始分析polaris是怎么通过进行服务发现的

polaris通过k8s的扩展api机制自定义了controller实现,下面选取一些关键代码进行分析

初始化controller

对每个资源增加创建、更新、删除操作的监控回调方法:

代码语言:javascript
复制
p := PolarisController{
   client: client,
   queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
      polarisControllerName),
   workerLoopPeriod: time.Second,
}

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onServiceAdd,
   UpdateFunc: p.onServiceUpdate,
   DeleteFunc: p.onServiceDelete,
})

endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onEndpointAdd,
   UpdateFunc: p.onEndpointUpdate,
   DeleteFunc: p.onEndpointDelete,
})

namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onNamespaceAdd,
   UpdateFunc: p.onNamespaceUpdate,
})

上面是典型的k8s资源监听代码

以service创建为例:

当k8s中一个service被创建成功之后,就会调用polaris的这个onServiceAdd方法:

代码语言:javascript
复制
func (p *PolarisController) onServiceAdd(obj interface{}) {

   service := obj.(*v1.Service)

   if !util.IsPolarisService(service, p.config.PolarisController.SyncMode) {
      return
   }

   key, err := util.GenServiceQueueKey(service)
   if err != nil {
      log.Errorf("generate queue key for %s/%s error, %v", service.Namespace, service.Name, err)
      return
   }

   p.enqueueService(key, service, "Add")
}

逻辑如下:

  • 判断是否可以注册为北极星服务
  • 根据该service生成相应的key
  • 将该key放入workqueue

启动controller

代码语言:javascript
复制
func (p *PolarisController) Run(workers int, stopCh <-chan struct{}) {
   defer runtime.HandleCrash()
   defer p.queue.ShutDown()
   defer p.consumer.Destroy()
   defer p.provider.Destroy()

   defer log.Infof("Shutting down polaris controller")

   if !cache.WaitForCacheSync(stopCh, p.podsSynced, p.servicesSynced, p.endpointsSynced, p.namespaceSynced) {
      return
   }

   p.CounterPolarisService()

   for i := 0; i < workers; i++ {
      go wait.Until(p.worker, p.workerLoopPeriod, stopCh)
   }

   //定时任务
   go p.MetricTracker(stopCh)

   <-stopCh
}

逻辑如下:

  • 等待k8s资源cache同步完成
  • 统计k8s服务资源能够注册为北极星服务的数量:通过k8s接口获取所有k8s服务,对每个service判断是否可以转换为北极星service
  • 启动多个work协程,每个协程处理流程如下:
    • 从workqueue中获取元素key
    • 从key中解析出namespace、service名等信息
    • 根据namespace和service名从informer的cache中获取service
    • 根据service是否存在做不同的处理
    • 如果不存在,则调用北极星接口创建相应的namespace、service等
  • work工作协程会一直轮询中,直到收到stop信号,也就是说会一直轮询取消workqueue中的元素进行上述的处理工作

informer回调

从上面我们知道,在controller的初始化和启动方法中,分别对workqueue进行push和pop元素,那么workqueue的push操作所在的回调方法是什么时候触发的呢?

我们看到在polaris-controller-manager的run方法中,还有这样的逻辑:

代码语言:javascript
复制
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.GenericInformerFactory.Start(controllerContext.Stop)

答案就是在这里,具体我们来分析下: 经过层层调用,最终在这里,我们看到,

代码语言:javascript
复制
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()

   fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

   cfg := &amp;Config{
      Queue: fifo,
      ListerWatcher: s.listerWatcher,
      ObjectType: s.objectType,
      FullResyncPeriod: s.resyncCheckPeriod,
      RetryOnError: false,
      ShouldResync: s.processor.shouldResync,

      Process: s.HandleDeltas,
   }

   func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()

      s.controller = New(cfg)
      s.controller.(*controller).clock = s.clock
      s.started = true
   }()

   // Separate stop channel because Processor should be stopped strictly after controller
   processorStopCh := make(chan struct{})
   var wg wait.Group
   defer wg.Wait() // Wait for Processor to stop
   defer close(processorStopCh) // Tell Processor to stop
   wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
   wg.StartWithChannel(processorStopCh, s.processor.run)

   defer func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()
      s.stopped = true // Don't want any new listeners
   }()
   s.controller.Run(stopCh)
}

上面的主要逻辑如下:

(1)调用NewDeltaFIFO,初始化DeltaFIFO; (2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法; (3)调用New,利用Config结构体来初始化controller; (4)调用s.processor.run,启动processor; (5)调用s.controller.Run,启动controller;

启动procesor

代码语言:javascript
复制
func (p *processorListener) run() {
   // this call blocks until the channel is closed. When a panic happens during the notification
   // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
   // the next notification will be attempted. This is usually better than the alternative of never
   // delivering again.
   stopCh := make(chan struct{})
   wait.Until(func() {
      // this gives us a few quick retries before a long pause and then a few more quick retries
      err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
         for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
               p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
               p.handler.OnAdd(notification.newObj)
            case deleteNotification:
               p.handler.OnDelete(notification.oldObj)
            default:
               utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
         }
         // the only way to get here is if the p.nextCh is empty and closed
         return true, nil
      })

      // the only way to get here is if the p.nextCh is empty and closed
      if err == nil {
         close(stopCh)
      }
   }, 1*time.Minute, stopCh)
}

启动informer的controller

代码语言:javascript
复制
func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
   r := NewReflector(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   r.ShouldResync = c.config.ShouldResync
   r.clock = c.clock

   c.reflectorMutex.Lock()
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group
   defer wg.Wait()

   wg.StartWithChannel(stopCh, r.Run)

   wait.Until(c.processLoop, time.Second, stopCh)
}

上面的主要逻辑是:

(1)调用NewReflector,初始化Reflector; (2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector; (3)调用c.processLoop,开始controller的核心处理;

启动reflector
代码语言:javascript
复制
func (r *Reflector) Run(stopCh <-chan struct{}) {
   klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
   wait.Until(func() {
      if err := r.ListAndWatch(stopCh); err != nil {
         utilruntime.HandleError(err)
      }
   }, r.period, stopCh)
}

主要就是从kube-apiserver处做list&watch操作,然后将得到的对象封装存储进DeltaFIFO中。

调用自定义eventhandler
代码语言:javascript
复制
func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来,然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。

c.config.Process其实就是sharedIndexInformer.HandleDeltas。 HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute通知自定义的ResourceEventHandler。

https://cloudsre.me/2020/03/client-go-0-informer/ https://qiankunli.github.io/2020/07/20/client_go.html https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html http://dockerone.com/article/2434596 https://www.cnblogs.com/lianngkyle/p/16244872.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有文化的技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 下面就从源码开始分析polaris是怎么通过进行服务发现的
  • polaris通过k8s的扩展api机制自定义了controller实现,下面选取一些关键代码进行分析
  • 初始化controller
  • 启动controller
  • informer回调
    • 启动procesor
      • 启动reflector
      • 调用自定义eventhandler
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档