What I Learned at Work this Week: Memory and Time-saving Strategies in Python
Be careful what you wish for. While I’ve spent more time writing Java, Typescript, GraphQL, and React at work, I pined for the days when my life was writing simple Python to solve simple problems. About two weeks ago, I got an assignment that appeared to be just that: a simple script to take two files and combine them to produce a single ndjson report (ndjson is just a JSON collection where elements are separated by new lines instead of commas).
But one of the files is extremely dense because of nested objects within collections within nested objects, so when I ran my script, it ran out of memory. When I increased the memory capacity, it ran for 12 hours and timed out. My code was extremely inefficient, and I had to learn how to refactor it to solve these problems of time and space.
The First Attempt
Before we refactor, let’s establish what the original script was doing. Here’s a simplified version that you can run from home (here’s some info on getting started with Python in case you’re brand new. You may need to install some additional libraries for this to run on your machine):
import json
import pandas as pd
if __name__ == '__main__':
PRODUCTS_FILE_NAME = 'products.ndjson'
PRICES_FILE_NAME = 'prices.ndjson'
with open(PRODUCTS_FILE_NAME, 'w') as products_file:
for number in range(1, 101):
products_file.write('{"product_id": '
+ str(number) + ', "product_name": "PLACEHOLDER NAME"}\n')
with open(PRICES_FILE_NAME, 'w') as prices_file:
for number in range(1, 101):
prices_file.write('{"product_id": '
+ str(number) + ', "product_price": ' + str(number * 100) + '}\n')
def df_from_ndjson(file_path):
with open(file_path, 'r') as ndjson_file:
result = []
for ndjson_line in ndjson_file:
if ndjson_line.strip():
json_line = json.loads(ndjson_line)
result.append(json_line)
result_df = pd.DataFrame(data=result, dtype=object)
return result_df
products_df = df_from_ndjson(PRODUCTS_FILE_NAME)
price_df = df_from_ndjson(PRICES_FILE_NAME)
enriched_df = pd.merge(products_df, price_df, on='product_id', how='left')
enriched_df.to_json('result.ndjson', orient='records', lines=True)
In practice, a client was providing the files, so I had to pull them from a shared location. We keep things simple here by writing them ourselves: one ndjson file with 100 rows of product names and another with 100 rows of product prices. We want to match these two on product ID so we can see the ID, name, and price of each product all in one place.
In my experience, the easiest way to do this type of “join” in Python is with Pandas. We import that library and then in df_from_ndjson
, we open the file, read each line, use json.loads to turn the data from a JSON string into a dictionary, and then append that dictionary to a list. Once we have a list of dictionaries, we can transform that into a Pandas data frame and, on the second-to-last line, merge the data on product_id
. Finally, we use to_json to change our data frame back into a JSON file, or actually an ndjson file thanks to the use of orient='records'
and lines=True
. When used together, those break up the json data by new lines instead of commas.
Measuring Performance
When I ran my job in production, it failed because it ran for over 12 hours. This little replica takes less than a second, but ultimately we want to figure out a way to make it run faster so that we can apply that principle to the real job. We can time our code by importing the time
module and adding a few simple lines:
time_before_dataframes = time.time()
products_df = df_from_ndjson(PRODUCTS_FILE_NAME)
price_df = df_from_ndjson(PRICES_FILE_NAME)
enriched_df = pd.merge(products_df, price_df, on='product_id', how='left')
enriched_df.to_json('result.ndjson', orient='records', lines=True)
end_time = time.time()
print(f'EXECUTION TIME WAS: {end_time - time_before_dataframes}')
Note that I put the timer after the ndjson files were created because the production job isn’t responsible for creating files. We just want to know how long it takes to transform that data into data frames, merge it, and write it to a file.
When I ran this, the final print statement was:
EXECUTION TIME WAS: 0.005265951156616211
Because the data is relatively tiny, it’s handled very efficiently. To better imitate the issue I’m having at work, I pumped up the files we’re creating by changing range(1, 101)
to range(1, 1000001)
. In my case, that made the script run in just over 4 seconds, which we can hopefully cut down.
We also want to see how much memory our job is using. The sys
library gives us a getsizeof method, which returns the size of any python object in bytes. I added a bunch of lines like this throughout my script:
print(f'THE FILE {file_path} IS {sys.getsizeof(ndjson_file)} BYTES')
I printed the size of the two ndjson files we write, the list we build when we convert them to objects, the data frame we transform that list into, and of course the final data frame. Here are the results:
Files, even very large files, are relatively small. The lists, which each contain a million elements, are 8.45 megabytes, which is a big jump. We take another big leap when we put them into a Pandas format, as the two individual data frames are 109 MB and 72 MB, respectively. The final data frame we create is 145 MB in size, so you can imagine how bad this gets when we try to use real data.
Memory Refactor
We’re using up a lot of space by putting all our data into lists and then data frames just to turn it back into ndjson in the end. Data frames are really useful when combining two large sets of data, but if we go line-by-line, we could save space by building fewer collections and by avoiding data frames. First, we change our df_from_ndjson
to dict_from_ndjson
:
def dict_from_ndjson(file_path):
with open(file_path, 'r') as ndjson_file:
print(f'THE FILE {file_path} IS {sys.getsizeof(ndjson_file)} BYTES')
result = {}
for ndjson_line in ndjson_file:
if ndjson_line.strip():
json_line = json.loads(ndjson_line)
result[json_line['product_id']] = json_line['product_price']
print(f'THE RESULT OBJECT FOR {file_path} IS {sys.getsizeof(result)} BYTES')
return result
Instead of making a list of dictionaries, we just make one dictionary where the key is our shared value (product_id) and the value is product_price
. We’ll check the size for performance comparison.
That function will create a dict for the prices file. We’ll pass that object to another function that will both merge and write the data:
def merge_and_write(file_path, dict_to_merge):
with open(file_path, 'r') as file_to_read,
open('result.ndjson', 'w') as file_to_write:
for ndjson_line in file_to_read:
if ndjson_line.strip():
single_line_dict = json.loads(ndjson_line)
single_line_dict['product_price'] =
dict_to_merge[single_line_dict['product_id']]
json.dump(single_line_dict, file_to_write)
file_to_write.write('\n')
print(f'THE FILE {file_path} IS {sys.getsizeof(file_to_write)} BYTES')
We start by opening two files: the one we’re reading from and the one we’re going to write to. We go through the read file line-by-line and enrich them by referencing the dict we created in the other function. Once we have each enriched dict, we write it directly to our new file usingjson.dump
. Then we add a newline to adhere to the ndjson format and finally print the file size. Here are the logged results:
We still build one large object that’s almost 42 MB, but that’s much smaller than what we were working with before. But notice the execution time — it’s almost twice as long as it was originally!
Time Refactor
At work, I implemented the Python concurrent library to create multiple asynchronous calls to my write function. But when I deployed that code, it still timed out. I think I may have learned how to make it work while writing this blog, but first let’s see the implementation that didn’t improve performance (if you want to run this yourself, note that you’ll have to add import tempfile
to your imports):
def divide_chunks(passed_list, size):
for i in range(0, len(passed_list), size):
yield passed_list[i : i + size]
def merge_and_write_individual_rows(chunk_of_product_rows, dict_to_merge):
temp_file_path = tempfile.NamedTemporaryFile('w+', delete=False).name
with open(temp_file_path, 'w') as temp_file:
for ndjson_line in chunk_of_product_rows:
if ndjson_line.strip():
single_line_dict = json.loads(ndjson_line)
single_line_dict['product_price'] = dict_to_merge[single_line_dict['product_id']]
json.dump(single_line_dict, temp_file)
temp_file.write('\n')
print(f'THE FINAL FILE IS {sys.getsizeof(temp_file)} BYTES')
return temp_file_path
def merge_and_write(file_path, dict_to_merge):
with open(file_path, 'r') as file_to_read, open('result.ndjson', 'w') as file_to_write:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for chunk in divide_chunks(file_to_read.read().splitlines(), 100000):
future = executor.submit(merge_and_write_individual_rows, chunk, dict_to_merge)
futures.append(future)
print(f'SO FAR WE HAVE SPENT {time.time() - time_before_dataframes} TIME')
for future in concurrent.futures.as_completed(futures):
print(f'WAITING FOR COMPLETION WE HAVE SPENT {time.time() - time_before_dataframes} TIME')
result = future.result()
with open(result, 'r') as thread_file:
file_to_write.write(thread_file.read())
file_to_write.write('\n')
thread_file.close()
The first function is a brand new chunk
function which breaks our data into smaller pieces. Next, we take our old merge_and_write
function and alter it slightly to merge_and_write_individual_rows
. This is now the function we’ll call for each chunk of data, ideally with parallel threads so that it runs on all chunks at the same time. We write to tempfiles and then, in merge_and_write_final
, we write the results of those tempfiles to our final result.ndjson
file.
The concurrent library syntax really confused me, but here’s my best explanation. First we create a ThreadPoolExecutor
and tell it that we’re willing to have up to 10 threads going at once. In that context, we create a series of futures
that will execute the merge_and_write_individual_rows
function and put them in a list. Finally, as the futures complete their work (as_completed), we read the result with future.result()
and write to the final file. If we run this and time it…we see that it takes about the same amount of time as before we made any changes.
I was struggling to understand what was wrong, so I asked chatGPT for advice. It told me that Python’s Global Interpreter Lock (GIL) ensures that only one thread executes Python bytecode at a time, which would explain a lot. It then suggested that rather than using multiple threads, I use ProcessPoolExecutor
to create multiple processes:
def merge_and_write_final(file_path, dict_to_merge):
with open(file_path, 'r') as file_to_read,
open('result.ndjson', 'w') as file_to_write:
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = []
for chunk in divide_chunks(
file_to_read.read().splitlines(), 100000):
future = executor.submit(
merge_and_write_individual_rows, chunk, dict_to_merge)
futures.append(future)
print(
f'SO FAR WE HAVE SPENT {time.time() - time_before_dataframes} TIME')
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
print(
f'WAITING FOR COMPLETION WE HAVE SPENT {time.time() - time_before_dataframes} TIME')
result = future.result()
with open(result, 'r') as thread_file:
file_to_write.write(thread_file.read())
file_to_write.write('\n')
It’s a very small difference, but we’re mainly using a different class, ProcessPoolExecutor
instead of ThreadPoolExecutor
and changing the number of max_workers
, though that value depends on how many processes our local machine or virtual environment can provide. The one other thing I had to do was move merge_and_write_individual_rows
into another module (file), which I called helpers.py
. This function has to be outside of the if __name__ == '__main__'
context because the individual processes don’t all have access to that context. When I ran this:
We cut our execution time in half! This is only a minor improvement on the original speed, but if we are able to add more processes, we can make this even faster.
Iteration
This code took me a lot longer than it could have, probably because I was trying to work too quickly. I followed the same path I always do for this type of problem and looked for quick fixes when that didn’t work. It’s important to take time and test things, especially when you’re looking to improve performance. I think I may have solved this once and for all, but when I write a PR to implement ProcessPoolExecutor
, I’ll definitely add some logs and run local tests before I ask for a review.
Sources
- Python For Beginners, Python docs
- Pandas docs
- json.loads() in Python, Geeks for Geeks
- pandas.DataFrame.to_json, Pandas docs
- How to check the execution time of Python script?, tutorialspoint
- How to find size of an object in Python?, Geeks for Geeks
- concurrent.futures, Python docs
- ChatGPT