Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Move Google Pub/Sub Messages Between Topics

How can I bulk move messages from one topic to another in GCP Pub/Sub?

I am aware of the Dataflow templates that provide this, however unfortunately restrictions do not allow me to use Dataflow API.

Any suggestions on ad-hoc movement of messages between topics (besides one-by-one copy and pasting?)

Specifically, the use case is for moving messages in a deadletter topic back into the original topic for reprocessing.

like image 893
user2994884 Avatar asked Nov 01 '25 19:11

user2994884


1 Answers

I suggest that you use a Python script for that. You can use the PubSub CLI to read the messages and publish to another topic like below:

from google.cloud import pubsub
from google.cloud.pubsub import types

# Defining parameters
PROJECT = "<your_project_id>" 
SUBSCRIPTION = "<your_current_subscription_name>"
NEW_TOPIC = "projects/<your_project_id>/topics/<your_new_topic_name>"


# Creating clients for publishing and subscribing. Adjust the max_messages for your purpose
subscriber = pubsub.SubscriberClient()
publisher = pubsub.PublisherClient(
    batch_settings=types.BatchSettings(max_messages=500),
)

# Get your messages. Adjust the max_messages for your purpose
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
response = subscriber.pull(subscription_path, max_messages=500)

# Publish your messages to the new topic
for msg in response.received_messages:
    publisher.publish(NEW_TOPIC, msg.message.data)


# Ack the old subscription if necessary
ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)

Before running this code you will need to install the PubSub CLI in your Python environment. You can do that running pip install google-cloud-pubsub

An approach to execute your code is using Cloud Functions. If you decide to use it, pay attention in two points:

  1. The maximum time that you function can take to run is 9 minutes. If this timeout get exceeded, your function will terminate without finishing the job.

  2. In Cloud Functions you can just put google-cloud-pubsub in a new line of your requirements file instead of running a pip command.

like image 164
rmesteves Avatar answered Nov 03 '25 14:11

rmesteves