What I Learned at Work this Week: Airflow Operators

Mike Diaz
4 min readJan 29, 2023
Photo by Bruno Cantuária: https://www.pexels.com/photo/seven-assorted-colored-rotary-telephones-774448/

This week, a teammate asked me for design help as he considered adding a new row to an existing database table. As someone who has to work with a 26-column table every day, I wanted to help to see if there was a way to keep his from growing. I asked for more context, so he sent me a large PR which explained the change he was making.

The PR had a lot of stuff I couldn’t understand, so we hopped on a zoom, talked through it, and figured it out. Now that I have some more time, I’ve revisited the PR to see if I could get a better idea of what it does!

S3KeySensor

I learned that the idea behind the code was to periodically check an AWS s3 location for objects with certain names and formats. If a user has uploaded a file, read and process it. If not, check again later. Amazing! I didn’t know Airflow could do this until I saw S3KeySensor in this PR.

This module can be imported directly from the Airflow library. According to the Airflow docs, it waits for a key (a file-like instance on S3) to be present in a S3 bucket. It accepts 5 parameters:

  • bucket_key: The key we’re looking for.
  • bucket_name: An s3 path will often look something like this s3://my-bucket/my-key/file-name.csv. Though the bucket_key argument can accept a full path, S3KeySensor asks us specify the bucket name here.
  • wildcard_match: The key argument could be the literal key name for the object, but it could also be a Unix wildcard pattern. If we set this argument to True, the function will check for any keys that match a pattern.
  • aws_conn_id: My understanding is that this is just used as a reference for organization and that it doesn’t connect to anything within AWS.
  • verify: By default, SSL certificates are verified, but if you provide False here, they will not be. We can also use a string, indicating the filename of a specific verification certificate.

But this PR involved more arguments than just these five. After doing some more research, I learned that S3KeySensor inherits arguments from the base airflow sensor class.

BaseOperator

According to the docs, Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated. Operators can perform an action, transfer data from one system to another, or act as sensors, running until a certain criterion is met. The arguments we’ve already seen are specific to our sensor operator, but the others I saw in the PR came from BaseOperator:

  • task_id: A string that describes the task.
  • mode: Whether s3 or a different source, our sensor is going to periodically check for a specified target. The mode argument indicates whether that check will be a “poke” or “reschedule.” Poke works best for relatively short jobs because it takes up a worker slot for the entirety of its runtime. Reschedule will release the worker if the criteria is not yet met and will…reschedule for a later time.
  • poke_interval: If our mode is set to poke, this number determines how frequently our task runs.
  • soft_fail: If this is True, the task will be skipped on failure, rather than retried.
  • timeout: In seconds, this determines how long our task can run before it automatically times out and fails.
  • executor_config: We can use this to specify where the task will be run. This code was pretty dense, so let’s look at it specifically:

executor_config

I had to read this code very slowly:

executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "1000Mi",
},
limits={"cpu": 2, "memory": "3000Mi"},
),
)
]
)
)
}

Our executor_config is a dictionary, but in this case it only has one key, pod_override. The doc I was looking at didn’t reference this property, but I did find it in the Kubernetes-specific docs, which makes sense since we see k8s all over this code. It offers the ability to override system defaults on a per-task basis. So we use it to specify the Kubernetes pod we want to use.

The k8s object has been imported from the kubernetes library. We’re creating a V1Pod, which is a collection of containers that can run on a host. Quick reminder that running our code takes up memory, so we rely on Kubernetes containers to allocate that memory, as opposed to running it on our own physical servers. In the code, we can see that we’re creating a list of containers, again using the k8s library to instantiate them by passing names, resources, request and limit sizes. Just in case I haven’t written about this before, request dictates the lower-limits of our containers and limits is the upper, so that we don’t waste resources by just picking something giant each time. If our job ends up needing 2000 Mi of memory, it might fail because, though our limit is 3000, there’s no guarantee that we’ll always select a container that size. If our job consistently runs out of memory, but uses less than 3000, we can increase the request size.

Just the beginning

This took a lot of reading for me to understand, but it was only 38 lines of code, most of which was the executor_config. I didn’t feel comfortable devoting this amount of time to a small portion of a PR during the workday, but I probably should have. Education is part of my job, and as long as I’m not holding someone up with my curiosity, it’s beneficial for everyone that I understand our code. As I review the rest of this PR, I’ll keep myself on the clock.

Sources

--

--