Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python in AWS Lambda not properly garbage collecting?

Scenario: I've written an AWS Lambda function that fires upon a CSV file being uploaded to an S3 bucket and will stream split the file by x-sized-MB chunks to multiple gzipped parquet files (the number of slices on a RedShift cluster for evenly distributed processing/loading). The idea here being that if I have a 3GB Lambda function, and receive an 8GB CSV file, or bigger, I should be able to process it in 1GB chunks, without reading the whole 8GB into memory and exceeding the 3GB limit.

import sys
import pandas as pd
import awswrangler as wr
import io

s3 = boto3.client('s3')

def split_file(file_size_in_MB, source_bucket, source_bucket_key):
    body = s3.get_object(Bucket=source_bucket, Key=source_bucket_key)['Body'] #streaming body
    chunk_size = 1024 * 1024 * file_size_in_MB # bytes
    newline = '\r\n'.encode()
    partial_chunk = b''
    counter = 0
    while (True):
        data = body.read(chunk_size)
        if counter == 0:
            header = data[0:data.find(newline)] # determine header on first pass
            chunk = partial_chunk + data
        else:
            chunk = header + partial_chunk + data
        if chunk == b'':
            break
        last_newline = chunk.rfind(newline)
        result = chunk[0:last_newline+1].decode('utf-8')
        print('1 mem size of chunk', round(sys.getsizeof(result)/1024/1024,2))
        if len(result) != 0:
            df = pd.read_csv(io.StringIO(result))
            print('2 mem size of df', round(sys.getsizeof(df)/1024/1024,2))
            wr.s3.to_parquet(df=df*1,
                             path=f's3://{target_stage_bucket}/results{counter}.parquet.gzip',
                             compression='gzip')
        else:
            break
        partial_chunk = chunk[last_newline+1:]
        counter+=1

split_file(file_size_in_MB=50,
               source_bucket=source_bucket,
               source_bucket_key=source_bucket_key)

Disclaimer: I understand there are improvements that can be made with this code, such as the newline splitting, the while(True), and the potential for timeouts that will need to be handled, I get it, but please remember this is dev code and I would like to focus on the specific problem that is the apparent memory leak that is happening when fired in AWS Lambda - see below:

If I run this function locally on a 1GB sized file streamed into 100MB chunks, I can see the size of each chunked pass and its Pandas equivalent (with a little overhead as expected):

running...
1 mem size of chunk 100.0
2 mem size of df 132.02
1 mem size of chunk 100.0
2 mem size of df 131.97
.....
1 mem size of chunk 100.0
2 mem size of df 132.06
1 mem size of chunk 24.0
2 mem size of df 31.68
1 mem size of chunk 0.0
completed in 0:02:38.995711

and here you can see the memory trajectory is relatively flat for the duration of the script with the expected spikes from each chunk being processed: enter image description here

However, the problem is when I implement this same code in Lambda allocated with 512 MB of memory, I receive the following error:

{
  "errorType": "MemoryError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 38, in lambda_handler\n    split_file(file_size_in_MB=100,\n",
    "  File \"/var/task/lambda_function.py\", line 79, in split_file\n    result = chunk[0:last_newline+1].decode('utf-8')\n"
  ]
}

and the following Log Output where you can see that the code is only making it to the first loop of 100MB of data:

1 mem size of chunk 100.0
[ERROR] MemoryError
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 38, in lambda_handler
    split_file(file_size_in_MB=100,
  File "/var/task/lambda_function.py", line 79, in split_file
    result = chunk[0:last_newline+1].decode('utf-8')END

So my question is - what is happening here? I would think that 512MB should be plenty of allocated memory to process these 100MB chunks, but in Lambda I run out of memory on the first pass, any ideas?

like image 244
gbeaven Avatar asked Oct 15 '25 20:10

gbeaven


2 Answers

This is likely because the lambda is executed as a warm start - meaning that an existing container is reused to avoid costly startup overheads. See this excellent blog post on the anatomy on warm starts. As an existing container is reused, the memory of the container is not guaranteed to be clear at the start of the next invocation. See this answer to a similar question.

So that means that you can't rely on always having max capacity available to lambda at runtime.

I think a better option here would be to use either a queue or step functions to handle the chunking of the data. E.g. A fanout pattern in which a Lambda process defines start/end rows for each subsequent lambda to start streaming from, and sends messages to a queue for which trigger multiple lambdas in parallel. This is just one option, and of course, you'd have to have failures in lambda etc.

Good luck :)

like image 193
mdmjsh Avatar answered Oct 17 '25 09:10

mdmjsh


I'm not sure what you used for plotting the graph. You can use something like memory-profiler for profiling your program.

I run memory_profiler on a program containing similar steps to a single iteration of your program. I also splitted some statements into multiple ones because @profile measures memory usage after each statement, not in between (and as far as I understand, the GC can kick in before the memory usage is measured again). I used del to delete temporary variables in the moment they are not needed anymore, emulating what I believe is happening in the unsplitted statements. Here's the result using chunks of 100MB:

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    35     70.0 MiB     70.0 MiB           1   @profile
    36                                         def read_by_chunk2(file_size_in_MB):
    37     70.0 MiB      0.0 MiB           1       with open('data.bin', 'rb') as f:
    38     70.0 MiB      0.0 MiB           1           chunk_size = 2**20 * file_size_in_MB
    39    170.0 MiB    100.0 MiB           1           chunk = f.read(chunk_size)
    40                                                 # Result
    41    270.0 MiB    100.0 MiB           1           chunk_part = chunk[:-10]
    42    370.0 MiB    100.0 MiB           1           result = chunk_part.decode('utf-8')
    43    270.0 MiB   -100.0 MiB           1           del chunk_part
    44                                                 # Dataframe
    45    670.0 MiB    400.0 MiB           1           s = StringIO(result)
    46    770.9 MiB    100.9 MiB           1           df = pd.read_csv(s)

The reason why StringIO needs FOUR times the memory of the string it wraps is beyond me. The good new is that you don't need it, and you don't need to decode the chunk yourself. You can pass a BytesIO to pd.read_csv instead:

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    49     70.0 MiB     70.0 MiB           1   @profile
    50                                         def read_by_chunk(file_size_in_MB):
    51     70.0 MiB      0.0 MiB           1       with open('data.bin', 'rb') as f:
    52     70.0 MiB      0.0 MiB           1           chunk_size = 2**20 * file_size_in_MB
    53    170.0 MiB    100.0 MiB           1           chunk = f.read(chunk_size)
    54                                                 # Splitting this statement would not change the memory usage because
    55                                                 # BytesIO keeps a reference to the chunk part
    56    270.0 MiB    100.0 MiB           1           bytesio = io.BytesIO(chunk[:-10])
    57    371.8 MiB    101.8 MiB           1           df = pd.read_csv(bytesio)

In general, you can decrease the memory usage by avoiding copies and forcing a garbage collection of data you don't need by calling del name; gc.collect().

like image 33
janluke Avatar answered Oct 17 '25 08:10

janluke



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!