I'm trying to create a code for AWS Lambda to convert csv to parquet. I can do that using Pyarrow but it is too large in size(~200 MB uncompressed) due to which I can't use it in deployment package for Lambda. I'm trying to write the parquet file to s3 bucket directly using BytesIO library.
Below is my lambda function code:
import json
import boto3
import pandas as pd
from io import BytesIO
def lambda_handler():
    s3 = boto3.client('s3')
    response = s3.list_objects_v2(
        Bucket = 'mybucket',
        Prefix = 'subfolder/'
    )
    files = get_object_keys(response)
    for file in files:
        obj = s3.get_object(Bucket='mybucket', Key=file)
        df = pd.read_csv(obj['Body'], sep='|')
        buf = BytesIO()
        df.to_parquet(buf, engine='fastparquet', index=False, compression='snappy')
        buf.seek(0)
        key = f"output/{file.split('/')[1].split('.')[0]}.parquet"
        s3.put_object(Bucket='mybucket', Body=buf.getvalue(), Key=key)
def get_object_keys(response):
    files = []
    for content in response['Contents']:
        if content['Key'].endswith('.csv'):
            files.append(content['Key'])
    return files
lambda_handler()
When i use 'fastparquet' as engine in dataframe.to_parquet(), i get the following error :
Traceback (most recent call last):
  File ".\lambda_function.py", line 77, in <module>
    lambda_handler()
  File ".\lambda_function.py", line 64, in lambda_handler
    df.to_parquet(buf, engine='fastparquet', index=False, compression='snappy')
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\util\_decorators.py", line 214, in wrapper
    return func(*args, **kwargs)
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py", line 2116, in to_parquet  
    **kwargs,
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\io\parquet.py", line 264, in to_parquet
    **kwargs,
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\io\parquet.py", line 185, in write
    **kwargs,
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\writer.py", line 880, in write       
    compression, open_with, has_nulls, append)
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\writer.py", line 734, in write_simple
    with open_with(fn, mode) as f:
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\util.py", line 42, in default_open   
    return open(f, mode)
TypeError: expected str, bytes or os.PathLike object, not _io.BytesIO
Does anyone know how to fix this?
fastparquet is a python implementation of the parquet format, aiming integrate into python-based big data work-flows. Not all parts of the parquet-format have been implemented yet or tested e.g. see the Todos linked below. With that said, fastparquet is capable of reading all the data files from the parquet-compatability project.
A Python interface to the Parquet file format. The Parquet format is a common binary data store, used particularly in the Hadoop/big-data sphere. It provides several advantages relevant to big-data processing, including: statistics stored in metadata allow for skipping unneeded chunks
PathLike object, not NoneType You probably have pathlib package installed in an environment with python version that does not require it anymore, and it conflicts with the python's built-in version of pathlib. So uninstall the pathlib package from your environment.
If you are using Arrow anyway, you probably want to use its parquet interface. PySpark, a Python API to the Spark engine, interfaces Python commands with a Java/Scala execution core, and thereby gives Python programmers access to the Parquet format. Spark is used in some tests and some test files were produced by Spark.
This error was resolved by using pyarrow as writing engine.
sample code.
buffer = ioBytesIO()
df.to_parquet(buffer, engine="pyarrow", index = False)
s3_resource = boto3.resource('s3')
s3_resouce.Object(`bucketname`, `path_withfilename`).put(body = buffer.getvalue())
for reading parquet file in python 3.6 I used fastparquet, but for writing pyarrow engine seems to work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With