Here’s the experience I generally have when my company starts migrating a process to a new service:
- Hear about the change at an all-hands. Understand nothing that is said because it all has to do with graphs and clouds and clusters and streams. Good luck with that, I hope it doesn’t impact my workflow.
- Hear a senior member/smarter member of my team talking about it. Thanks for taking that on, I hope that I never have to do that.
- I start a seemingly unrelated task and that person says “oh you can’t use that old process anymore, use the new thing I mentioned at stand up last week.” I guess I have to learn this now.
And that brings us to Airflow. This week, I came up with an idea for a fix that would involve running a Python script daily through a cron automation. Well, the crontab is deprecated so now we have to do it with Airflow.
What is Airflow?
[Apache] Airflow is a platform to programmatically author, schedule and monitor workflows. These workflows are packaged into Directed Acyclic Graphs, commonly referred to as DAGs. Not only will Airflow allow me to create a DAG that can execute my desired Python script, but it gives me options for increasing the number of workers for load management, customizing memory usage, timeouts, and retries, and provides a robust UI for DAG management and logging.
This is a big improvement over the previous platform we were using for scheduled jobs, so I understand why we made the change. All I have to do now is learn how to create a DAG of my own.
Scheduling my job with Airflow
I learned that I would have to make changes in two different codebases to get this working. In studying for this post, it’s become clear that this method isn’t universal, but instead conforms to the specific structure of my company’s codebase. For anyone who wants to learn more about a conventional DAG setup, I’d suggest this video, which helped me grasp the basics of Airflow.
The first change included the logic and dependencies of the job. One big change that Airflow brought us was being able to encapsulate each job independently rather than house them all in the same place. Each job not only has its own logic, but its own set of dependencies and ultimately its own Docker instance. So I would have to create five files:
- A
Dockerfile
. My understanding is that this is what we’d call the Docker image, or a file used to execute code in a Docker container. The image provides instructions for how to run a container and create an instance of our script that can be executed. In this case, it says things like:
RUN pip install --no-cache-dir -r requirements.txt
If you’re not familiar with Docker, the gist here is that we’re creating an independent environment where our code can be run. It installs its own dependencies, which saves us from things like versioning conflicts. If one of my scripts uses Python 3.10 and the other uses Python 2.7, I won’t run into any issues running both simultaneously if they each have their own Docker image.
2. A .dockerignore
file. In some cases, we may want to exclude certain parts of our code from the image, so we use .dockerignore
.
3. env.sh
. This file contains configuration parameters for the Docker container. In my case, all it had was an APP_NAME for the job.
4. requirements.txt
. This declares what libraries must be imported for the script to run and which versions it runs on.
5. Finally, we have the script itself! This is a Python file that defines the logic we want to execute.
The DAG
Now comes the exciting part. I had to add one more file, this time to our Airflow codebase. This file would be responsible for defining the DAG, which contains critical metadata about our job. To create a DAG, we have to execute a function that we import from airflow
. Our convention for populating those arguments is to instantiate a series of classes, which my company’s codebase defined, and which organize all of our data:
JobDescriptor
. This includes the name and tags we want to apply for logging purposes. It also uses two less intuitive properties:max_active_runs
andconcurrency
. The former determines how many times this job can be running via Airflow simultaneously. The latter determines how many tasks can be running within the job itself. In my example, I was working with a job that would run a script for 100 different companies. Rather than process one at a time, we designed the DAG to process 10 at a time with an array of tasks. Soconcurrency
was set to 10 whilemax_active_runs
was set to 2, because we don’t want too many copies of this heavy job running at the same time.CronSchedule
. The important property here iscron_schedule
, which is a string that uses cron notation to indicate how frequently the job should run.RetryConfig
. This allows us to account for cases where our job fails. We useretry_on_failure
to decide whether the script should be rerun at all,retry_times
to decide when we should stop retrying, andexecution_timeout
for setting a limit on how long the script should run before we kill it. Note thatexecution_timeout
uses the timedelta data type, so it has to be a range rather than just a number of seconds, hours, or days.
There were also two more classes that defined data about the how we would run the job, but were not specifically passed into the DAG function:
ExecutableImage
. Here we specify details about the script itself, including thecontainer
that houses the image, the environment (production vs development), and arguments required by the script like company ID and date range. We pass the APP_NAME we previously defined inenv.sh
as a string to specify the container.KubernetesExecutorConfig
. If you’re like me, it feels like you’ve heard “Kubernetes” about 10,000 times and never once known what it is. For reference, Kubernetes is a portable, extensible, open-source platform for managing containerized workloads and services, that facilitates both declarative configuration and automation. In this moment, all we need to understand is that Kubernetes is managing the container that houses our script image and we want to give it some parameters for execution. Specifically, we’re going to tell it how much CPU and how much memory to use. Definitions for these terms usually come up when shopping for a computer, but they’re still relevant here. We can tell our job to use more CPU, which could make the job run faster. We can also say how much memory is needed depending on how much data our script stores. I learned that producing logs uses a lot of memory, so we went through our script to see if we could reduce the number of logs it fired.
What’s next
Though I’m getting better at reading individual files, it’s consistently difficult for me to figure out how different systems connect, and Airflow is no exception. I looked around the codebase in vain, trying to find the line of code that would make this DAG appear in the Airflow UI. I did eventually realize that the main Python code I was viewing was more of a configuration file which used imported functions to execute traditional Airflow logic. Most notably, that’s how I found the DAG function, which is present in all the tutorials but not in my company’s code. For example, in the Airflow documentation, we see:
with DAG(
# populating DAG arguments
) as dag:
# instantiate operators and execute tasks
The code I read never used with
, instead just executing the DAG function and returning the results. I think this has something to do with the cron setup, such that we’re scheduling the job instead of executing it in the moment, but I’ll have to ask for help to find out for sure. Sometimes we have to study the code just to figure out the right questions to ask.
Sources
- Apache Airflow Documentation
- Learning Apache Airflow with Python in easy way in 40 Minutes, soumilshah1995
- Docker Image, Alexander S. Gillis, Tech Target
- How to control the parallelism or concurrency of an Airflow installation?, Stack Overflow
- crontab guru,
- datetime — Basic date and time types, Python docs
- What is Kubernetes?, Kubernetes docs
- CPU vs. RAM: Which is More Important for You?, CDW