Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How could I read fast enough from Google Pub/Sub using Python

I am trying to read messages from the realtime public projects/pubsub-public-data/topics/taxirides-realtime stream, and it seems I am not processing the data fast enough or there is a problem with acknowledgement. "Unacked message count" is constantly increasing whatever I am doing (even if I purge the messages before running my code). I tried running the same code from my home Windows 10 PC, from a GCP-based Ubuntu VM and from the GCP console terminal with the same result.

Additional info: In one of my GCP projects I created a subscription "taxi-ride-client" for the public projects/pubsub-public-data/topics/taxirides-realtime PubSub topic and my application reading that. Messages are arriving to my program, but either processed slow or improperly.

Am I doing something wrong, or is Python too slow for this? Here is my code:

import os
from google.cloud import pubsub_v1

def callback(message):
    ''' Processing PubSub messages '''
    message.ack()

if __name__ == '__main__':

    project_name = '<projectname>'
    credfile = '<credfilename>.json'
    subscription_name = 'taxi-ride-client'

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credfile

    subscriber = pubsub_v1.SubscriberClient()
    subscription = subscriber.subscription_path(project_name, subscription_name)
    subscr_future = subscriber.subscribe(subscription, callback=callback)
    print('Listening for messages via: {}'.format(subscription))

    try:
        subscr_future.result(timeout=600)   # running for 10 minutes
    except Exception as ex:
        subscr_future.cancel()

    print('\nNormal program termination.\n')

The stream producing some 8-10 million records per hor out of which less than 0.5% matches the IF condition in my callback. Anyhow I also tried a totally empty callback that contained only the acknowledgement line.

I also ran this small program in 5 separate copies to read from the very same subscription, but even in that case I could not make a difference. That suggest that I have problem with the acknowledgement.

What am I doing wrong?

By the way I implemented the solution using GC DataFlow with first step as reading from the PubSub topic and that is working fine under Python. That is a different library and different architecture. But it easily processes 9 000 000 messages hourly.

Still I am curious, how this should be done using python and pure PubSub (without Beam).

(UPDATE)

Reproduction

  1. GCP project created with name: <your-test-project>
  2. Service account file is created with Project/Owner role and credential file downloaded in JSON format
  3. Subscription created in command shell: gcloud pubsub subscriptions create projects/<your-test-project>/subscriptions/taxi-ride-client --topic=projects/pubsub-public-data/topics/taxirides-realtime --ack-deadline=60 --message-retention-duration=6h
  4. Python 3.7 virtual environment with google-cloud-pubsub (version 1.1.0)
  5. Run the code after replacing <projectname> and <credfilename>. Source code here

Gábor

like image 650
ghrasko Avatar asked Nov 20 '25 21:11

ghrasko


1 Answers

With limitations inherent to Python's runtime regarding multi-threaded processing, high-throughput in Cloud Pub/Sub is difficult to achieve. Dataflow doesn't use Python under the hood for its implementation that reads from Pub/Sub, so it is not subject to such limitations. Java and Go tend to have much better performance characteristics for a single machine with multiple cores, so one option is to switch languages. Alternatively, you would have to horizontally scale and bring up more instances of your client so you could process more data in parallel. You might find the blog post on client library performance to be an interesting read.

like image 166
Kamal Aboul-Hosn Avatar answered Nov 23 '25 12:11

Kamal Aboul-Hosn



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!