What I Learned at Work this Week: Airflow Operators
This week, a teammate asked me for design help as he considered adding a new row to an existing database table. As someone who has to work with a 26-column table every day, I wanted to help to see if there was a way to keep his from growing. I asked for more context, so he sent me a large PR which explained the change he was making.
The PR had a lot of stuff I couldn’t understand, so we hopped on a zoom, talked through it, and figured it out. Now that I have some more time, I’ve revisited the PR to see if I could get a better idea of what it does!
S3KeySensor
I learned that the idea behind the code was to periodically check an AWS s3 location for objects with certain names and formats. If a user has uploaded a file, read and process it. If not, check again later. Amazing! I didn’t know Airflow could do this until I saw S3KeySensor in this PR.
This module can be imported directly from the Airflow library. According to the Airflow docs, it waits for a key (a file-like instance on S3) to be present in a S3 bucket. It accepts 5 parameters:
- bucket_key: The key we’re looking for.
- bucket_name: An s3 path will often look something like this
s3://my-bucket/my-key/file-name.csv
. Though thebucket_key
argument can accept a full path,S3KeySensor
asks us specify the bucket name here. - wildcard_match: The key argument could be the literal key name for the object, but it could also be a Unix wildcard pattern. If we set this argument to
True
, the function will check for any keys that match a pattern. - aws_conn_id: My understanding is that this is just used as a reference for organization and that it doesn’t connect to anything within AWS.
- verify: By default, SSL certificates are verified, but if you provide
False
here, they will not be. We can also use a string, indicating the filename of a specific verification certificate.
But this PR involved more arguments than just these five. After doing some more research, I learned that S3KeySensor inherits arguments from the base airflow sensor class.
BaseOperator
According to the docs, Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated. Operators can perform an action, transfer data from one system to another, or act as sensors, running until a certain criterion is met. The arguments we’ve already seen are specific to our sensor operator, but the others I saw in the PR came from BaseOperator:
- task_id: A string that describes the task.
- mode: Whether s3 or a different source, our sensor is going to periodically check for a specified target. The
mode
argument indicates whether that check will be a “poke” or “reschedule.” Poke works best for relatively short jobs because it takes up a worker slot for the entirety of its runtime. Reschedule will release the worker if the criteria is not yet met and will…reschedule for a later time. - poke_interval: If our mode is set to
poke
, this number determines how frequently our task runs. - soft_fail: If this is
True
, the task will be skipped on failure, rather than retried. - timeout: In seconds, this determines how long our task can run before it automatically times out and fails.
- executor_config: We can use this to specify where the task will be run. This code was pretty dense, so let’s look at it specifically:
executor_config
I had to read this code very slowly:
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "1000Mi",
},
limits={"cpu": 2, "memory": "3000Mi"},
),
)
]
)
)
}
Our executor_config is a dictionary, but in this case it only has one key, pod_override
. The doc I was looking at didn’t reference this property, but I did find it in the Kubernetes-specific docs, which makes sense since we see k8s
all over this code. It offers the ability to override system defaults on a per-task basis. So we use it to specify the Kubernetes pod we want to use.
The k8s
object has been imported from the kubernetes
library. We’re creating a V1Pod, which is a collection of containers that can run on a host. Quick reminder that running our code takes up memory, so we rely on Kubernetes containers to allocate that memory, as opposed to running it on our own physical servers. In the code, we can see that we’re creating a list of containers
, again using the k8s
library to instantiate them by passing names, resources, request and limit sizes. Just in case I haven’t written about this before, request
dictates the lower-limits of our containers and limits
is the upper, so that we don’t waste resources by just picking something giant each time. If our job ends up needing 2000 Mi of memory, it might fail because, though our limit is 3000, there’s no guarantee that we’ll always select a container that size. If our job consistently runs out of memory, but uses less than 3000, we can increase the request size.
Just the beginning
This took a lot of reading for me to understand, but it was only 38 lines of code, most of which was the executor_config
. I didn’t feel comfortable devoting this amount of time to a small portion of a PR during the workday, but I probably should have. Education is part of my job, and as long as I’m not holding someone up with my curiosity, it’s beneficial for everyone that I understand our code. As I review the rest of this PR, I’ll keep myself on the clock.
Sources
- airflow.sensors.s3_key_sensor, Airflow docs
- Operators, Airflow docs
- Kubernetes Executor, Airflow docs
- V1Pod, kubernetes-client docs