kubernetes scheduler源码剖析

2016/02/19 kubernetes

scheduler在整个集群中负责pod的调度,对机器(node)进行筛选和过滤,选择最合适的机器运行pod。这部分组件是以插件的形式存在的,如果需要定制调度算法也是比较方便

scheduler是作为plugin放在k8s里面,代码在/plugin下面。代码结构如下:

.
├── cmd
│   └── kube-scheduler
│       ├── app
│       │   └── server.go
│       └── scheduler.go
└── pkg  
    └── scheduler
        ├── algorithm
...

还是按照一贯的风格, main函数在plugin/cmd目录下面, 新建app.NewSchedulerServer(),解析命令行参数,执行Run函数。

在Run这个接口,首先是生成masterClient的配置, 因为配置指定会有多种方式,因此需要进行合并,规则如下:

  • 启动的时候通过kubeconfig显示(ExplicitPath)指定, 优先级最高,其次是Precedence字段指定的其他文件,优先级以此降低,也就是先解析的配置覆盖后面解析的配置项,但是如果遇到新的配置项,就会被加入。
  • ClusterInfo: clientcmdapi.Cluster{Server: s.Master} 指定 api-server(master)的信息,这里会覆盖前面文件指定的集群信息
  • .kubeconfig 文件,如果解析到相对路径,就会以.kubeconfig的父文件夹为父路径,合并成绝对路径

合并完配置, 新建一个RESTClient,

kubeClient, err := client.New(kubeconfig)
...
configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst))
config, err := s.createConfig(configFactory)

createConfig会依次,直到调用

  • CreateFromConfig
  • CreateFromProvider

来决策对应的调度算法,默认情况下使用CreateFromProvider, providerName是AlgorithmProvider: factory.DefaultProvider, 也就是DefaultProvider调度算法,然后调用CreateFromKeys,通过getFitPredicateFunctions获得对应的调度算法。 那这个算法provider是什么时候注册进去的呢?

在每个./plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go的init可以看到

factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities())

在这里进行了实际的注册,其中

  • defaultPredicates 主要进行过滤
  • PodFitsHostPorts 过滤端口冲突的机器
  • PodFitsResources 判断是否有足够的资源
  • NoDiskConflict 没有挂载点冲突
  • MatchNodeSelector 指定相同标签的node调度
  • HostName 指定机器调度
  • defaultPriorities 主要进行筛选
  • LeastRequestedPriority : 使用公式cpu((capacity - sum(requested)) * 10 / capacity) + memory((capacity - sum(requested)) * 10 / capacity) / 2 来计算node的score
  • BalancedResourceAllocation : score = 10 - abs(cpuFraction-memoryFraction)*10,cpuFraction是已经分配的除以整机的CPU比例,也就是说资源碎片越小得分越低,表示分配更“均衡”
  • SelectorSpreadPriority 降低聚集度,尽量的降低同一个service或者rc上的pods的数目,也就是在predicate的时候尽量的降低冲突的概率

每个步骤的得分进行相加,最后选出最高得分的机器,如果有多个相同得分的机器, 就从中随机选择一个。 作为pod的调度目标机器。

algo :=scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r)

创建GenericScheduler,使用筛选和过滤的函数

func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm {
	return &genericScheduler{
		predicates:   predicates,
		prioritizers: prioritizers,
		pods:         pods,
		random:       random,
	}
}

GenericScheduler类实现了ScheduleAlgorithm接口的Schedule方法,Scheduler方式直接选出分配到的机器

type ScheduleAlgorithm interface {
    //选出machine
	Schedule(*api.Pod, NodeLister) (selectedMachine string, err error)
}

回到创建配置文件的方法中:初始化scheduler.Config的NextPod接口,这里就是每次调度的数据源。最后返回scheduler.config,配置创建完成

// Creates a scheduler from a set of registered fit predicate keys and priority keys.

func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String) (*scheduler.Config, error) {
  ...
  	NextPod: func() *api.Pod {
            pod := f.PodQueue.Pop().(*api.Pod)
            glog.V(2).Infof("About to try and schedule pod %v", pod.Name)
            return pod
        },      
  ...
}

然后新建新建一个事件广播器,同时监听对应的事件,并且通过EventSink将其存储,代码如下:

eventBroadcaster := record.NewBroadcaster()
 config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})

最后启动scheduler,此次的config就是上面创建的config

sched := scheduler.New(config)
sched.Run()

这里就会启动一个goroutine调用pkg/scheduler/scheduler.go文件的Run,也就是scheduleOne函数。

// Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() {
	go util.Until(s.scheduleOne, 0, s.config.StopEverything)
}

// Until loops until stop channel is closed, running f every period.
// Catches any panics, and keeps going. f may not be invoked if
// stop channel is already closed. Pass NeverStop to Until if you
// don't want it stop.
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	for {
		select {
		case <-stopCh:
			return
		default:
		}
		func() {
			defer HandleCrash()
			f()
		}()
		time.Sleep(period)
	}
}
func (s *Scheduler) scheduleOne() {
    //首先获取调度的pod
    pod := s.config.NextPod()
    if s.config.BindPodsRateLimiter != nil {
        //等待token变为可用状态
        s.config.BindPodsRateLimiter.Accept()
    }   
    
    glog.V(3).Infof("Attempting to schedule: %+v", pod)
    start := time.Now()
    defer func() {
        metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
    }() 
    //进行实际调度,默认是调度算法是上面提到的DefaultProvider,也就是执行具体的调度算法
    dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister)

    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
    if err != nil {
        glog.V(1).Infof("Failed to schedule: %+v", pod)
        s.config.Recorder.Eventf(pod, "FailedScheduling", "%v", err)
        s.config.Error(pod, err)
        return
    }   

    //将pod绑定到node
    b := &api.Binding{
        ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
        Target: api.ObjectReference{
            Kind: "Node",
            Name: dest,
        },  
    }   
// We want to add the pod to the model if and only if the bind succeeds,
    // but we don't want to race with any deletions, which happen asynchronously.
    s.config.Modeler.LockedAction(func() {
        bindingStart := time.Now()
       // 发送调度结果给master
        err := s.config.Binder.Bind(b)
        metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
        if err != nil {
            glog.V(1).Infof("Failed to bind pod: %+v", err)
            s.config.Recorder.Eventf(pod, "FailedScheduling", "Binding rejected: %v", err)
            s.config.Error(pod, err)
            return
        }
       //记录一条调度信息
        s.config.Recorder.Eventf(pod, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest)
        // tell the model to assume that this binding took effect.
        assumed := *pod
        assumed.Spec.NodeName = dest
        //激活被调度的pod
        s.config.Modeler.AssumePod(&assumed)
    })
}

Search

    Table of Contents