Machine Learning Models on Cloud Composer with Kubernetes Pod Operator

Stav Hacohen
5 min readFeb 8, 2021

Airflow is one of the most popular ETL tools out there. The GCP (Google Cloud Platform) managed service of Airflow — Cloud Composer does a lot of the heavy lifting for us, so we can focus on modeling and building awesome data tools. As I will elaborate further in this post, to support a stable and scalable environment there are still a few things we should keep in mind — such as managing per task dependencies and workloads. This is where Kubernetes Pod Operator comes in handy.

Although I was able to find some info about implementing such an architecture — I couldn’t find a sufficient resource of an end to end implementation of such a design. In this post, I will try to assemble all the needed information including Composer settings, docker build and deploy, and the Operator itself. The post can help you build any kind of environment, furthermore, the code examples can be used specifically for building Facebook fbprophet modeling environment (check this Dockerfile), which according to the web presents a struggle for many.

Why do we need it?

Managing dependencies

Suppose you have a new DAG to deploy with some dependencies (in my case the model used the Facebook Prophet package which has quite some dependencies). What you might naturally try is installing this package on the Composer instance from the PYPI PACKAGES tab. However, this can go very wrong as the Airflow scheduler, webserver and worker share the same environment. In my case, for example, the Facebook Prophet package needed a Pandas package with a version above 1.0, while the Airflow worker itself needed Pandas on version 0.25. The conflict can get even more complex if a different DAG requires a different Pandas version. And imagine what happens when you want to migrate to a newer version of Airflow with different dependencies. The result will be an Airflow crash.

Managing workloads

A different scenario is when the load on your Airflow instance is growing, it could be because more data is coming-in or new tasks are running on the instance. That makes the load on the worker heavier. Upgrading the resources of the entire machine is a waste since it must be up 24/7. Another problem is that your tasks will compete on resources between themselves and the scheduler, which might lead to a crash, not only of the task but the entire Airflow instance.

The common challenges —

  1. Your Airflow instance (scheduler, webserver, worker) has a specific dependency that conflicts with one of your DAGs.
  2. Your DAG has a specific dependency that conflicts with one of your other DAGs.
  3. Your tasks and the Airflow instance are competing on the same resources.

Possible solutions —

  1. Open an additional Airflow instance when dependency conflicts arise or load is getting higher — this can help but would be costly and hard to maintain.
  2. Work with virtual environments (PythonVirtualenvOperator for example) — this can help you manage dependencies but you would have to rebuild the environment in each DAG run which might significantly affect the running time. Also, the resources of the Airflow instance will still be shared.
  3. Decouple your infrastructure and execute your tasks with Docker running on Kubernetes Engine with specific dependencies, resources and auto-scaling. This one will definitely address our challenges.

Solution: Decoupled infrastructure

Kubernetes Pod Operator will allow us to run our code in a designated environment and change our architecture from coupled with the airflow instance — into a decoupled architecture.

The task is triggering a docker image on a different machine to run a model with specific dependencies

The components of the solution:

  1. New node pool on Composer (“pool-1” in the diagram).
  2. KubernetesPodOperator to trigger the container (“DAGS” in the diagram).
  3. Docker image with specific dependencies stored on Cloud Register (“Conda fbprophet” in the diagram).
  4. Model stored in Cloud Storage (“Modules” in the diagram).

Add new node pool -

By default - your scheduler, webserver, and the worker will all run on the “default-pool”. Create a new node pool where your docker can handle the core logic of your task. You can determine the machine type according to your needs and also set the auto-scaling. For more info check https://cloud.google.com/kubernetes-engine/docs/how-to/node-pools.

Node pools on Cloud Composer

Build your image and publish to Kubernetes -

To build a docker image we use 3 files: “Dockerfile” to define the build of the image, “environment.yml” for settings, and finally the “app.py” which will be executed in every container run. You can find them all in this repository. Let’s focus on the Dockerfile:

Dockerfile

What happens here?

The image is based on miniconda3, using conda-forge solved many of the internal dependency issues of fbprophet. app.py is dragged into the image and finally being called with a parameter (“test_model”) which will be overridden in the DAG to run any desired code. app.py basically loads the model from the “Module” bucket where the model is located. So when a model is updated or a new model is added they can use the same configuration — only the model files in the Modules bucket should change. This is great since (a) No need to rebuild and redeploy the image for every update or new model, and (b) Data scientists will only need to take care of the models in the bucket and not the deployment of docker images.

Drop your python model file in cloud storage -

Create a bucket and store the model files you would like the Docker to use as a module. In the example code, the interface defines the model as a python file that implements the Model class with a “run” method where your model is being executed.

Set your DAG to trigger Kubernetes on pool-1 node which runs an image to execute the model

What happens here?

An image is being created on “pool-1”, the ENTERYPOINT of the original Dockerfile is being overridden by the “cmd” parameter. It lets you define the source of the model which you would like to run from this DAG.

Recap

Following this design DAGs will still be running on the default-pool. However, all they do now is to schedule triggers via Kubernetes Pod Operator of your images on an extra node-pool as an independent machine built for your dependencies. This is how you achieve stable and scalable infrastructure for your Airflow instance.

--

--