Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When using Beam IO ReadFromPubSub module, can you pull messages with attributes in Python? It's unclear if its supported

Trying to pull messages with attributes stored in PubSub into a Beam pipeline. I'm wondering if support has been added for Python and that's why I'm unable to read them. I see that it exists in Java.

pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True

pipeline = beam.Pipeline(options=pipeline_options)
messages = (pipeline | beam.io.ReadFromPubSub(subscription=subscription_name).with_output_types(bytes))

def printattr(element):
    print(element.attributes)


lines = messages | 'printattr' >> beam.Map(printattr)

result = pipeline.run()
result.wait_until_finish()

Expected to be able to list the attributes- data:

b'Message number 1109'
-     attributes: {
-       "_comments": "nan",
-       "_direction": "SE",
-       "_fromst": "Harlem",
-       "_last_updt": "2019-03-20 21:11:02.0",
-       "_length": "0.56",
-       "_lif_lat": "41.9809967484",
-       "_lit_lat": "41.9787314076",
-       "_lit_lon": "-87.7964600566",
-       "_strheading": "W",
-       "_tost": "Oak Park",

, but I can only seem to access information stored in the data field- not the attributes.

like image 897
cloudpython Avatar asked Dec 01 '25 04:12

cloudpython


1 Answers

After reviewing the documentation: https://beam.apache.org/releases/pydoc/2.11.0/apache_beam.io.gcp.pubsub.html, I was able to see an additional argument to pass in to ReadFromPubSub.

The 'with_attributes=True' needed to be set, otherwise, you just get the data fields.

Hopefully this helps someone else who may get stuck or just be tired :)

like image 129
cloudpython Avatar answered Dec 02 '25 17:12

cloudpython



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!