Running Airflow Using Kubernetes Executor and Kubernetes Pod Operator with Istio | by Joshua Yeung | Apr, 2022

Airflow Series

Overcoming obstacles

Photo by Eduard Labár on Unsplash

The Kubernetes executor runs each task instance in its own pod on a Kubernetes cluster. That means for each task in a dag, there is a separated pod generated in the Kubernetes cluster. The image of the pod is an Airflow base image. The pod will run your task, PythonOperator, BashOperator, etc. It basically runs the Airflow command “airflow task run…” using the Airflow base image inside the pod.

The Kubernetes Pod Operator also creates a new pod on a Kubernetes cluster, but the image is defined by you using the image arguments in the operator. It basically calls the Kubernetes API to launch a pod and run your image, and then record your result.

In most simple languages, it is a service networking layer. It controls how the traffic route through each service. As the modern microservice architecture often consists of a hundred or thousand of services, it needs another layer to handle all the network traffic. It is called service mesh.

Istio will inject a sidecar called istio-proxy inside your pod and handle the traffic for you. But this hinders the normal workflow of the Kubernetes executor and Kubernetes Pod Operator. It is because both of them use the Pod Phase to determine the status of the dag task. When the Pod Phase is successful and failed, Airflow will consider that the task is finished. Because of the istio-proxy sidecar, the pod will always be running. Just the task container itself is completed. This will lead to the Pod Phase become Not Ready in the success task, and Error in the failed task. In Airflow UI, you will see the task always running and never-ending.

False Hopes

When doing an initial search in google, there is an elegant solution that uses Kubernetes lifecycle type: Sidecar. Like the following configuration, you just need to set the container lifecycle type to the sidecar, then Kubernetes will handle all the rest.

apiVersion: v1
kind: Pod
name: bookings-v1-b54bc7c9c-v42f6
app: demoapp
- name: bookings
image: banzaicloud/allspark:0.1.1
- name: istio-proxy
type: Sidecar

However, somehow the solution described above isn’t really implemented in Kubernetes 1.18 as expected.

Another hack is to wrap your container command if the following:

- command:
- /bin/sh
- -c
- |
until curl -fsI http://localhost:15021/healthz/ready; do echo "Waiting for Sidecar..."; sleep 3; done;
echo "Sidecar available. Running the command...";
x=$(echo $?); curl -fsI -X POST http://localhost:15020/quitquitquit && exit $x

In the beginning, it uses the Istio endpoint to check whether the sidecar has started up. If it starts up, then run your usual command. After your container is completed, it uses Istio quitquitquit endpoint to shut down the istio-proxy. This feature is available after Istio 1.7

At the first glance, this may be the solution we are longing for. But overall it fails because we cannot override the command of the generated pod. As I said before, the Kubernetes executor basically runs the Airflow command “airflow task run…” using the Airflow base image inside the pod. We need to hack into the source code of the Kubernetes executor to add the wrapper. We definitely don’t want to do so.

This script is still useful in Airflow related jobs or cronjobs for example the Migration Database Job. If you are using the Helm chart to deploy Airflow, below is what you should enter in the values ​​YAML.

- "bash"
- "-c"
- |-
until curl -fsI http://localhost:15021/healthz/ready; do echo "Waiting for Sidecar..."; sleep 3; done;
echo "Sidecar available. Running the command...";
airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "db upgrade" "upgradedb" }};
x=$(echo $?); curl -fsI -X POST http://localhost:15020/quitquitquit && exit $x


Astronomer image has dealt with this issue, they made a whole new Istio class to handle this problem. You may view their code here:

They make use of the Istio API endpoint quitquitquit, to shutdown istio-proxy when the job is done.

There is a major drawback to using Astronomer’s solution. The solution only deals with the Istio proxy. There are many other sidecars available, for example, vault agent injector. Vault is a secret store and the vault agent injector use Pod annotations to control which secret is injected into the pod using the sidecar. In this case, using Astronomer’s image still faces the same problem.

A CronJob to Kill Sidecar

Thank you Yogesh Sharma who provided a useful script for us to shut down all the sidecars. We can deploy a CronJob to use Kubernetes API to check the pod status and kill the sidecars if the task is finished. We need to add a little checking on the container status. If the container is finished, the dictionary return by container_statuses will have a field called terminated. Also by default, Airflow named the task container base.

We check the container status, if it is terminated, then we execute the exit command on other containers and kill it.

This handles the sidecar problem and prevents modifying the Airflow Kubernetes Executor codebase. However, Kubernetes Pod Operator can deploy pod on an arbitrary namespace, it is not a good practice to have a cronjob to kill sidecars on every namespace. It may accidentally kill other useful pods. (perhaps some used the same name ‘base’ in their pod)

Custom Kubernetes Pod Operator

The best way is to implement a custom Kubernetes Pod Operator that shutdown the pod when the base container (the container that runs your task) is completed. It involves creating a custom pod launcher.

Let me show you how to construct it step by step. If you follow the source code of the Kubernetes Pod Operator, you will find that the key part is in theexecutemethod. It first creates a pod (get_or_create_pod), and wait for the pod to start (await_pod_start). It then waits for the pod completion using Pod Manager’s await_pod_completion. This is the method we would like to override in our custom pod manager. What we need is a Pod Manager that can consider the pod completed when the base container is completed. Below is the code snippet of Pod Manager, you can see in lines 55 to 56 that we added checking to determine if the pod is completed. If the pod status phase is in running (as the istio proxy sidecar is still running) but the base container is completed, then we consider the pod is completed as well.

The container_is_completed method is just like the native container_is_running method. If the container is completed, then container_status.state.terminated in the response payload of the Kubernetes API will have the completion info. To check if it is succeeded, the exit_code field will be 0, otherwise, it will be another exit code.

And then the following is easy, we just use our pod manager in the Kubernetes Pod Operator. As you can see below, our custom operator initializes a custom pod manager. We then override the original cleanup method and only change line 8. This line checks if the task is failed or not. We only consider that the pod is failed if the base container is not succeeded.

In this way, we can finally run Airflow using Kubernetes executor and Kubernetes Pod Operator with Istio enabled!

Photo by Jason Hogan on Unsplash

You may encounter a Pod creation fails issue when your task name is long enough. Airflow will add a random suffix after your task name to form the pod name. Although Airflow automatically handles the long task name and safely constructs the pod creation, Istio will add an additional label that uses your pod name. This label name will exceed 63 characters, and thus the pod cannot create.

Thank you Kevin Pullin to provide a workaround for us. We can assign a specific label to the workers or in the pod template to solve this problem. If you are using the official helm chart to deploy Airflow, we can set this in the label field some_name. This will fix the issue of the Kubernetes executor generating worker pods.

For Kubernetes Pod Operator, again we need to modify our customer Kubernetes Pod Operator. We also need to add the label field to the pod, therefore we add the label to the kwargs in our custom operator __init__ method and pass to the original constructor.

def __init__(self, *args, **kwargs):
kwargs["labels"] = kwargs.get("labels", {}) | {"": "airflow_pod"}
super().__init__(*args, **kwargs)

There are many obstacles and difficulties when we deploy Airflow using Kubernetes executor and Kubernetes Pod Operator to an Istio enabled Kubernetes cluster. Istio proxy sidecar hinders the normal workflow of the Kubernetes executor and Kubernetes Pod Operator. It makes the worker pod hang and run forever.

Airflow never knows that the task is finished. By implementing a custom operator inherited from Kubernetes Pod Operator and also implementing a cronjob to terminate all those sidecars, we can successfully deploy Airflow to an Istio enabled Kubernetes cluster. I hope that this article can help your deployment smoother with our experience.

Leave a Comment