Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow with gcp python : ValueError: Stream must be at beginning

I am using python along with airflow and gcp python library. I automated the process of sending files to gcp using airflow dags. The code is as follows :-

for fileid, filename in files_dictionary.items():
    if ftp.size(filename) <= int(MAX_FILE_SIZE):
        data = BytesIO()
        ftp.retrbinary('RETR ' + filename, callback=data.write)
        f = client.File(client, fid=fileid)
        size = sys.getsizeof(data.read())
        // Another option is to use FileIO but not sure how
        f.send(data, filename, size) // This method is in another library 

The code to trigger the upload is current repo (as soon above) but real upload is done by another dependency which is not in our control. The documentation of that method is

 def send(self, fp, filename, file_bytes):
        """Send file to cloud
        fp file object
        filename   is the name of the file.
        file_bytes is the size of the file in bytes
        """
        data = self.initiate_resumable_upload(self.getFileid())

        _, blob = self.get_gcs_blob_and_bucket(data)

        # Set attachment filename. Does this work with datasets with folders
        original_filename = filename.rsplit(os.sep, 1)[-1]
        blob.content_disposition = "attachment;filename=" + original_filename

        blob.upload_from_file(fp)

        self.finish_resumable_upload(self.getFileid())

I am getting below error

[2020-04-23 09:43:17,239] {{models.py:1788}} ERROR - Stream must be at beginning.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1657, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 103, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/transfer_data.py", line 241, in upload
    f.send(data, filename, size)
  File "/usr/local/lib/python3.6/site-packages/client/utils.py", line 53, in wrapper_timer
    value = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/client/client.py", line 518, in send
    blob.upload_from_file(fp)
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1158, in upload_from_file
    client, file_obj, content_type, size, num_retries, predefined_acl
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1068, in _do_upload
    client, stream, content_type, size, num_retries, predefined_acl
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1011, in _do_resumable_upload
    predefined_acl=predefined_acl,
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 960, in _initiate_resumable_upload
    stream_final=False,
  File "/usr/local/lib/python3.6/site-packages/google/resumable_media/requests/upload.py", line 343, in initiate
    stream_final=stream_final,
  File "/usr/local/lib/python3.6/site-packages/google/resumable_media/_upload.py", line 415, in _prepare_initiate_request
    raise ValueError(u"Stream must be at beginning.")
ValueError: Stream must be at beginning.
like image 857
OPTIMUS Avatar asked Sep 06 '25 03:09

OPTIMUS


2 Answers

The upload_from_file function has a parameter that handles the seek(0) call for you:

I would modify your upload_from_file call to:

blob.upload_from_file(file_obj=fp, rewind=True)

That should do the trick, and you don't need to include the additional seek()

like image 113
kmans Avatar answered Sep 07 '25 19:09

kmans


When reading a binary file, you can navigate through it using seek operations. In other words, you can move the reference from the beginning of the file to any other position. The error ValueError: Stream must be at beginning. is basically saying: "your reference is not pointed to the beginning of the stream and it must be"

Given that, you need to set your reference back to the beginning of the stream. You can do that using the function seek.

In your case, you would do something like:

    data = BytesIO()
    ftp.retrbinary('RETR ' + filename, callback=data.write)
    f = client.File(client, fid=fileid)
    size = sys.getsizeof(data.read())
    data.seek(0)
    f.send(data, filename, size)
like image 27
rmesteves Avatar answered Sep 07 '25 21:09

rmesteves