Writing custom Kubernetes Scheduler

This post will give you a detailed explanation on what is done under the hood when a pod gets scheduled onto a node in the Kubernetes infrastructure and how to write a custom scheduling mechanism using Python in order to achieve the same.

It is suggested that you get the following setup completed before going through the blogs if you want to experiment with the things being discussed on this post as you read through them for a better understanding.

  1. A single or multi node Kubernetes setup (A simple VM based setup or a minikube based setup will work perfectly)
  2. An active internet connection to pull docker images and create pods in your infrastructure to test the changes
  3. Basic understanding of Kubernetes components such as API Server, ETCD and Kubelet alongside a basic understanding of any one of the CRI like Docker

What Is a Kubernetes Scheduler?

In a general multi-node Kubernetes deployment, you will see a case with single master node (referred to as the Control Plane) and multiple worker nodes (referred to as the Minions). So, when you ask for a Pod to be created by means of a Deployment/Daemonset or a standalone Pod itself, Kubernetes has to intelligently figure out the right minion node on which the Pod/workload can be placed. The process by which Kubernetes figures out this Node is called the process of Scheduling a Pod.

How does Kubernetes Scheduler Work?

Kubernetes Scheduling Lifecycle Sequence diagram from above gives you a brief idea of what is the workflow done under the hood from the time an end user performed a kubectl apply -f pod.yaml to the point where you can see the pod running on one of the Minion nodes on your cluster.

  1. A pod is created and it’s desired state is stored into the etcd with spec.nodeName field set to empty indicating that the pod is required to be scheduled on a node
  2. Scheduler magically (details of this, for another time) figures out that there is a new pod that is spec.nodeName set to empty
  3. Scheduler figures out the node available in the cluster that best fists the scheduling requirements of the pod based on desired state
  4. Once this decision making is done, it informs API Server that a given pod has to be bound to a specific node identified in step #3
  5. API server persists this information into etcd as part of the desired state spec
  6. kublet running on the minion node that the pod was bound to, notices that it has to create a new pod and gets to work. With the help of CRI such as docker or rkt or such, the containers required to fulfil the pod’s desired state is create and the pod’s state transition completes

Now, that doesn’t look so complicated. Is it?

Why write a custom Kubernetes Scheduler?

One might wonder why should I go about writing a new custom Kubernetes scheduler when the default scheduler provided works like a charm? Well, this is not something you do everyday. Unless there is a specific reason to implement a new scheduling mechanism, you are probably better off not implementing one on your own.

But with recent announcement that the Kubernetes community has extended the scheduling framework into a plugin that can be used to extend upon the scheduling behaviour, it is essential that you are aware of how to use the framework provided by the community and be aware of how to write a new scheduler from scratch if there ever comes a time to write one on your own.

Writing a Custom Scheduler

How to Indicate Kubernetes not to use Default Scheduler?

Unless you explicitly ask for it, Kubernetes by default assumes that it has to use the default-scheduler to assign a pod to a node to schedule the workload.

spec:
    schedulerName: default-scheduler

Pod Spec Indicating the usage of default-scheduler So, asking kubernetes to use a custom scheduler is as simple as setting the spec.schedulerName parameter to a custom value and that is it. kube-scheduler will happily ignore those pods and your custom scheduler can now decide where these pods go.

Code your Scheduler

In this example, we are going to write a custom scheduler using Python and run it as a standalone Pod on your cluster and make it serve as a custom scheduler for any pods that has spec.schedulerName field set to my-custom-scheduler.

  1. The scheduler pod will run an infinite loop with a specific Watch looking for all the new pod creation events
  2. Filter out the new pods that has the spec.schedulerName set to my-custom-scheduler
  3. Perform a random node allocation by extracting all the nodes and picking a random one out of it

Extract a List of Nodes in the Cluster

def _get_ready_nodes(v1_client, filtered=True):
    ready_nodes = []
    try:
        for n in v1_client.list_node().items:
            # Look for any node that has noCustomScheduler label set to yes indicating that custom scheduler should not assign any pod to that node.
            if n.metadata.labels.get("noCustomScheduler") == "yes":
                logger.info(f"Skipping Node {n.metadata.name} since it has noCustomScheduler label")
                continue
            
            if not n.spec.unschedulable:
                no_schedule_taint = False
                if n.spec.taints:
                    # Check if there are any taints on the node that might indicate that pods should not be scheduled. 
                    for taint in n.spec.taints:
                        if _NOSCHEDULE_TAINT == taint.to_dict().get("effect", None):
                            no_schedule_taint = True
                            break
                if not no_schedule_taint:
                    for status in n.status.conditions:
                        if status.status == "True" and status.type == "Ready" and n.metadata.name:
                            ready_nodes.append(n.metadata.name)
                else:
                    logger.error("NoSchedule taint effect on node %s", n.metadata.name)
            else:
                logger.error("Scheduling disabled on %s ", n.metadata.name)
        logger.info("Nodes : %s, Filtered: %s", ready_nodes, filtered)
    except ApiException as e:
        logger.error(json_loads(e.body)["message"])
        ready_nodes = []
    return ready_nodes

Extract list of available nodes for scheduling

Select a Random Node

Once you extract the list of nodes available for schedule, let’s just randomly pick one of those nodes and schedule the pod on it. Now, in production environment, while writing a custom  scheduler, picking a random node is strict NO. Please ensure you have the backing of right business logic and decision criteria before scheduling a pod onto a Node or it can have some serious consequences.

def _get_schedulable_node(v1_client):
    node_list = _get_ready_nodes(v1_client)
    if not node_list:
        return None
    available_nodes = list(set(node_list))
    return random.choice(available_nodes)

Select a Random Node for Scheduling

Assign Node to Pod

Once you have picked a node that you want the pod to be scheduled in, you can now go ahead and use the V1Binding APIs from the python kubernetes client to indicate that a given pod has to be bound to a node picked from above.

def schedule_pod(v1_client, name, node, namespace="default"):
    target = V1ObjectReference()
    target.kind = "Node"
    target.apiVersion = "v1"
    target.name = node
    meta = V1ObjectMeta()
    meta.name = name
    body = V1Binding(api_version=None, kind=None, metadata=meta, target=target)
    logger.info("Binding Pod: %s  to  Node: %s", name, node)
    return v1_client.create_namespaced_pod_binding(name, namespace, body)

Bind Pod to a Node And that is it. Now, you can write a watch loop that will filter the records from API server for certain events and you can call the schedule_pod method on the data extracted from those events and wait for your pod to be scheduled based on your fancy new custom scheduler.

def watch_pod_events():
    V1_CLIENT = CoreV1Api()
    while True:
        try:
            logger.info("Checking for pod events....")
            try:
                watcher = watch.Watch()
                for event in watcher.stream(V1_CLIENT.list_pod_for_all_namespaces, label_selector=SCHEDULE_STRATEGY, timeout_seconds=20):
                    logger.info(f"Event: {event['type']} {event['object'].kind}, {event['object'].metadata.namespace}, {event['object'].metadata.name}, {event['object'].status.phase}")
                    if event["object"].status.phase == "Pending":
                        try:
                            logger.info(f'{event["object"].metadata.name} needs scheduling...')
                            pod_namespace = event["object"].metadata.namespace
                            pod_name = event["object"].metadata.name
                            service_name = event["object"].metadata.labels["serviceName"]
                            logger.info("Processing for Pod: %s/%s", pod_namespace, pod_name)
                            node_name = _get_schedulable_node(V1_CLIENT)
                            if node_name:
                                logger.info("Namespace %s, PodName %s , Node Name: %s  Service Name: %s",
                                            pod_namespace, pod_name, node_name, service_name)
                                res = schedule_pod(V1_CLIENT, pod_name, node_name, pod_namespace)
                                logger.info("Response %s ", res)
                            else:
                                logger.error(f"Found no valid node to schedule {pod_name} in {pod_namespace}")
                        except ApiException as e:
                            logger.error(json_loads(e.body)["message"])
                        except ValueError as e:
                            logger.error("Value Error %s", e)
                        except:
                            logger.exception("Ignoring Exception")
                logger.info("Resetting k8s watcher...")
            except:
                logger.exception("Ignoring Exception")
            finally:
                del watcher
        except:
            logger.exception("Ignoring Exception & listening for pod events")

Wait and Watch for the Events on the API Server to trigger the scheduling logic

Show Me the Demo Or It did not work

Create Custom Scheduler Pod

Let’s create a docker image for the scheduler to use and wrap our code in and create a deployment for scheduler in the kube-system namespace.

FROM python:3.7
RUN pip install kubernetes
COPY scheduler.py /scheduler.py
CMD python /scheduler.py

Create Kubernetes RBAC Setup for Scheduler Pod

apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-custom-scheduler
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: my-custom-scheduler
subjects:
- kind: ServiceAccount
  name: my-custom-scheduler
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: system:kube-scheduler
  apiGroup: rbac.authorization.k8s.io

Create a Deployment for Scheduler Pod

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: scheduler
    tier: control-plane
  name: my-custom-scheduler
  namespace: kube-system
spec:
  selector:
    matchLabels:
      component: scheduler
      tier: control-plane
  replicas: 1
  template:
    metadata:
      labels:
        component: scheduler
        tier: control-plane
        version: second
    spec:
      serviceAccountName: my-custom-scheduler
      containers:
      - image: scheduler:v0.0.1
        name: my-custom-scheduler

Create a Test Pod

apiVersion: v1
kind: Pod
metadata:
  name: annotation-second-scheduler
  labels:
    name: multischeduler-example
    schedulingStrategy: meetup
    serviceName: annotation-second-scheduler
spec:
  schedulerName: my-custom-scheduler
  containers:
  - name: pod-with-second-annotation-container
    image: k8s.gcr.io/pause:2.0

You can find the GitHub Repo for the Example used in this article below