What I Learned at Work this Week: Airflow

Mike Diaz
6 min readMar 20, 2022

--

Photo by Min An from Pexels

Here’s the experience I generally have when my company starts migrating a process to a new service:

  1. Hear about the change at an all-hands. Understand nothing that is said because it all has to do with graphs and clouds and clusters and streams. Good luck with that, I hope it doesn’t impact my workflow.
  2. Hear a senior member/smarter member of my team talking about it. Thanks for taking that on, I hope that I never have to do that.
  3. I start a seemingly unrelated task and that person says “oh you can’t use that old process anymore, use the new thing I mentioned at stand up last week.” I guess I have to learn this now.

And that brings us to Airflow. This week, I came up with an idea for a fix that would involve running a Python script daily through a cron automation. Well, the crontab is deprecated so now we have to do it with Airflow.

What is Airflow?

[Apache] Airflow is a platform to programmatically author, schedule and monitor workflows. These workflows are packaged into Directed Acyclic Graphs, commonly referred to as DAGs. Not only will Airflow allow me to create a DAG that can execute my desired Python script, but it gives me options for increasing the number of workers for load management, customizing memory usage, timeouts, and retries, and provides a robust UI for DAG management and logging.

This is a big improvement over the previous platform we were using for scheduled jobs, so I understand why we made the change. All I have to do now is learn how to create a DAG of my own.

Scheduling my job with Airflow

I learned that I would have to make changes in two different codebases to get this working. In studying for this post, it’s become clear that this method isn’t universal, but instead conforms to the specific structure of my company’s codebase. For anyone who wants to learn more about a conventional DAG setup, I’d suggest this video, which helped me grasp the basics of Airflow.

The first change included the logic and dependencies of the job. One big change that Airflow brought us was being able to encapsulate each job independently rather than house them all in the same place. Each job not only has its own logic, but its own set of dependencies and ultimately its own Docker instance. So I would have to create five files:

  1. A Dockerfile. My understanding is that this is what we’d call the Docker image, or a file used to execute code in a Docker container. The image provides instructions for how to run a container and create an instance of our script that can be executed. In this case, it says things like:
RUN pip install --no-cache-dir -r requirements.txt

If you’re not familiar with Docker, the gist here is that we’re creating an independent environment where our code can be run. It installs its own dependencies, which saves us from things like versioning conflicts. If one of my scripts uses Python 3.10 and the other uses Python 2.7, I won’t run into any issues running both simultaneously if they each have their own Docker image.

2. A .dockerignore file. In some cases, we may want to exclude certain parts of our code from the image, so we use .dockerignore.

3. env.sh. This file contains configuration parameters for the Docker container. In my case, all it had was an APP_NAME for the job.

4. requirements.txt. This declares what libraries must be imported for the script to run and which versions it runs on.

5. Finally, we have the script itself! This is a Python file that defines the logic we want to execute.

The DAG

Now comes the exciting part. I had to add one more file, this time to our Airflow codebase. This file would be responsible for defining the DAG, which contains critical metadata about our job. To create a DAG, we have to execute a function that we import from airflow. Our convention for populating those arguments is to instantiate a series of classes, which my company’s codebase defined, and which organize all of our data:

  1. JobDescriptor. This includes the name and tags we want to apply for logging purposes. It also uses two less intuitive properties: max_active_runs and concurrency. The former determines how many times this job can be running via Airflow simultaneously. The latter determines how many tasks can be running within the job itself. In my example, I was working with a job that would run a script for 100 different companies. Rather than process one at a time, we designed the DAG to process 10 at a time with an array of tasks. So concurrency was set to 10 while max_active_runs was set to 2, because we don’t want too many copies of this heavy job running at the same time.
  2. CronSchedule. The important property here is cron_schedule, which is a string that uses cron notation to indicate how frequently the job should run.
  3. RetryConfig. This allows us to account for cases where our job fails. We use retry_on_failure to decide whether the script should be rerun at all, retry_times to decide when we should stop retrying, and execution_timeout for setting a limit on how long the script should run before we kill it. Note that execution_timeout uses the timedelta data type, so it has to be a range rather than just a number of seconds, hours, or days.

There were also two more classes that defined data about the how we would run the job, but were not specifically passed into the DAG function:

  1. ExecutableImage. Here we specify details about the script itself, including the container that houses the image, the environment (production vs development), and arguments required by the script like company ID and date range. We pass the APP_NAME we previously defined in env.sh as a string to specify the container.
  2. KubernetesExecutorConfig. If you’re like me, it feels like you’ve heard “Kubernetes” about 10,000 times and never once known what it is. For reference, Kubernetes is a portable, extensible, open-source platform for managing containerized workloads and services, that facilitates both declarative configuration and automation. In this moment, all we need to understand is that Kubernetes is managing the container that houses our script image and we want to give it some parameters for execution. Specifically, we’re going to tell it how much CPU and how much memory to use. Definitions for these terms usually come up when shopping for a computer, but they’re still relevant here. We can tell our job to use more CPU, which could make the job run faster. We can also say how much memory is needed depending on how much data our script stores. I learned that producing logs uses a lot of memory, so we went through our script to see if we could reduce the number of logs it fired.

What’s next

Though I’m getting better at reading individual files, it’s consistently difficult for me to figure out how different systems connect, and Airflow is no exception. I looked around the codebase in vain, trying to find the line of code that would make this DAG appear in the Airflow UI. I did eventually realize that the main Python code I was viewing was more of a configuration file which used imported functions to execute traditional Airflow logic. Most notably, that’s how I found the DAG function, which is present in all the tutorials but not in my company’s code. For example, in the Airflow documentation, we see:

with DAG(
# populating DAG arguments
) as dag:
# instantiate operators and execute tasks

The code I read never used with, instead just executing the DAG function and returning the results. I think this has something to do with the cron setup, such that we’re scheduling the job instead of executing it in the moment, but I’ll have to ask for help to find out for sure. Sometimes we have to study the code just to figure out the right questions to ask.

Sources

--

--

Mike Diaz
Mike Diaz

No responses yet