Scenario: I've written an AWS Lambda function that fires upon a CSV file being uploaded to an S3 bucket and will stream split the file by x-sized-MB chunks to multiple gzipped parquet files (the number of slices on a RedShift cluster for evenly distributed processing/loading). The idea here being that if I have a 3GB Lambda function, and receive an 8GB CSV file, or bigger, I should be able to process it in 1GB chunks, without reading the whole 8GB into memory and exceeding the 3GB limit.
import sys
import pandas as pd
import awswrangler as wr
import io
s3 = boto3.client('s3')
def split_file(file_size_in_MB, source_bucket, source_bucket_key):
body = s3.get_object(Bucket=source_bucket, Key=source_bucket_key)['Body'] #streaming body
chunk_size = 1024 * 1024 * file_size_in_MB # bytes
newline = '\r\n'.encode()
partial_chunk = b''
counter = 0
while (True):
data = body.read(chunk_size)
if counter == 0:
header = data[0:data.find(newline)] # determine header on first pass
chunk = partial_chunk + data
else:
chunk = header + partial_chunk + data
if chunk == b'':
break
last_newline = chunk.rfind(newline)
result = chunk[0:last_newline+1].decode('utf-8')
print('1 mem size of chunk', round(sys.getsizeof(result)/1024/1024,2))
if len(result) != 0:
df = pd.read_csv(io.StringIO(result))
print('2 mem size of df', round(sys.getsizeof(df)/1024/1024,2))
wr.s3.to_parquet(df=df*1,
path=f's3://{target_stage_bucket}/results{counter}.parquet.gzip',
compression='gzip')
else:
break
partial_chunk = chunk[last_newline+1:]
counter+=1
split_file(file_size_in_MB=50,
source_bucket=source_bucket,
source_bucket_key=source_bucket_key)
Disclaimer: I understand there are improvements that can be made with this code, such as the newline splitting, the while(True), and the potential for timeouts that will need to be handled, I get it, but please remember this is dev code and I would like to focus on the specific problem that is the apparent memory leak that is happening when fired in AWS Lambda - see below:
If I run this function locally on a 1GB sized file streamed into 100MB chunks, I can see the size of each chunked pass and its Pandas equivalent (with a little overhead as expected):
running...
1 mem size of chunk 100.0
2 mem size of df 132.02
1 mem size of chunk 100.0
2 mem size of df 131.97
.....
1 mem size of chunk 100.0
2 mem size of df 132.06
1 mem size of chunk 24.0
2 mem size of df 31.68
1 mem size of chunk 0.0
completed in 0:02:38.995711
and here you can see the memory trajectory is relatively flat for the duration of the script with the expected spikes from each chunk being processed:
However, the problem is when I implement this same code in Lambda allocated with 512 MB of memory, I receive the following error:
{ "errorType": "MemoryError", "stackTrace": [ " File \"/var/task/lambda_function.py\", line 38, in lambda_handler\n split_file(file_size_in_MB=100,\n", " File \"/var/task/lambda_function.py\", line 79, in split_file\n result = chunk[0:last_newline+1].decode('utf-8')\n" ] }
and the following Log Output where you can see that the code is only making it to the first loop of 100MB of data:
1 mem size of chunk 100.0 [ERROR] MemoryError Traceback (most recent call last): File "/var/task/lambda_function.py", line 38, in lambda_handler split_file(file_size_in_MB=100, File "/var/task/lambda_function.py", line 79, in split_file result = chunk[0:last_newline+1].decode('utf-8')END
So my question is - what is happening here? I would think that 512MB should be plenty of allocated memory to process these 100MB chunks, but in Lambda I run out of memory on the first pass, any ideas?
This is likely because the lambda is executed as a warm start - meaning that an existing container is reused to avoid costly startup overheads. See this excellent blog post on the anatomy on warm starts. As an existing container is reused, the memory of the container is not guaranteed to be clear at the start of the next invocation. See this answer to a similar question.
So that means that you can't rely on always having max capacity available to lambda at runtime.
I think a better option here would be to use either a queue or step functions to handle the chunking of the data. E.g. A fanout pattern in which a Lambda process defines start/end rows for each subsequent lambda to start streaming from, and sends messages to a queue for which trigger multiple lambdas in parallel. This is just one option, and of course, you'd have to have failures in lambda etc.
Good luck :)
I'm not sure what you used for plotting the graph. You can use something like memory-profiler for profiling your program.
I run memory_profiler
on a program containing similar steps to a single iteration of your program. I also splitted some statements into multiple ones because @profile
measures memory usage after each statement, not in between (and as far as I understand, the GC can kick in before the memory usage is measured again). I used del
to delete temporary variables in the moment they are not needed anymore, emulating what I believe is happening in the unsplitted statements. Here's the result using chunks of 100MB:
Line # Mem usage Increment Occurences Line Contents
============================================================
35 70.0 MiB 70.0 MiB 1 @profile
36 def read_by_chunk2(file_size_in_MB):
37 70.0 MiB 0.0 MiB 1 with open('data.bin', 'rb') as f:
38 70.0 MiB 0.0 MiB 1 chunk_size = 2**20 * file_size_in_MB
39 170.0 MiB 100.0 MiB 1 chunk = f.read(chunk_size)
40 # Result
41 270.0 MiB 100.0 MiB 1 chunk_part = chunk[:-10]
42 370.0 MiB 100.0 MiB 1 result = chunk_part.decode('utf-8')
43 270.0 MiB -100.0 MiB 1 del chunk_part
44 # Dataframe
45 670.0 MiB 400.0 MiB 1 s = StringIO(result)
46 770.9 MiB 100.9 MiB 1 df = pd.read_csv(s)
The reason why StringIO
needs FOUR times the memory of the string it wraps is beyond me. The good new is that you don't need it, and you don't need to decode the chunk yourself. You can pass a BytesIO
to pd.read_csv
instead:
Line # Mem usage Increment Occurences Line Contents
============================================================
49 70.0 MiB 70.0 MiB 1 @profile
50 def read_by_chunk(file_size_in_MB):
51 70.0 MiB 0.0 MiB 1 with open('data.bin', 'rb') as f:
52 70.0 MiB 0.0 MiB 1 chunk_size = 2**20 * file_size_in_MB
53 170.0 MiB 100.0 MiB 1 chunk = f.read(chunk_size)
54 # Splitting this statement would not change the memory usage because
55 # BytesIO keeps a reference to the chunk part
56 270.0 MiB 100.0 MiB 1 bytesio = io.BytesIO(chunk[:-10])
57 371.8 MiB 101.8 MiB 1 df = pd.read_csv(bytesio)
In general, you can decrease the memory usage by avoiding copies and forcing a garbage collection of data you don't need by calling del name; gc.collect()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With