How to enable pub/sub for local development¶
If you're developing software that interacts with Pub/Sub topics, you'll likely want to verify the messages that have been published to these topics. For local development and testing purposes, you can achieve this by using an emulator. This setup allows you to create and delete topics and subscriptions, enabling a clean environment for each test. The following guide outlines how to set up Pub/Sub emulation for local development and continuous integration (CI) environments, such as GitLab.
Setting Up the Pub/Sub Emulator with Docker-Compose¶
To run the Pub/Sub emulator locally, you can use Docker Compose. Below is a docker-compose.yml example that defines a service for running the emulator:
services:
pubsub-emulator:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
ports:
- "8085:8085"
This configuration will start the Pub/Sub emulator on port 8085 of your local machine.
Configuring GitLab CI to Use the Pub/Sub Emulator¶
When running tests in the GitLab CI environment, you can also utilize the Pub/Sub emulator. Add the following service configuration to your .gitlab-ci.yml file to include the Pub/Sub emulator in your CI pipeline:
.pubsub-emulator-service: &pubsub-emulator-service
name: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
alias: pubsub-emulator
command: [ "gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085" ]
entrypoint: [ "" ]
python:tox:
services:
- *pubsub-emulator-service
This configuration ensures that the Pub/Sub emulator is available during your CI pipeline execution.
Writing Test Code to Use the Emulator¶
The following Python code snippet demonstrates how to create and delete topics and subscriptions for testing purposes:
import os
import time
from google.cloud import pubsub_v1
from google.api_core.exceptions import NotFound
from django.conf import settings
def re_create_topic_and_subscription():
os.environ["PUBSUB_EMULATOR_HOST"] = settings.PUBSUB_EMULATOR_HOST
project = "None"
topic_name = "topic_name"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
try:
publisher.delete_topic(topic=topic_path)
except NotFound:
pass
publisher.create_topic(name=topic_path)
subscriber = pubsub_v1.SubscriberClient()
subscription_name = f"{topic_name}_subscription"
subscription_path = subscriber.subscription_path(project, subscription_name)
try:
subscriber.delete_subscription(subscription=subscription_path)
except NotFound:
pass
subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path})
return subscriber, subscription_path
def test_a():
subscriber, subscription_path = re_create_topic_and_subscription()
received_pubsub_messages = []
def receive_pubsub_message(message):
received_pubsub_messages.append(message)
message.ack()
subscriber.subscribe(subscription_path, callback=receive_pubsub_message)
# Trigger action that publishes messages to the topic
an_action_that_means_2_messages_are_put_on_the_topic()
timeout_seconds = 20
start_time = time.time()
while time.time() - start_time < timeout_seconds:
if len(received_pubsub_messages) >= 2:
break
time.sleep(0.5)
assert len(received_pubsub_messages) == 2
Setting Environment Variables¶
It's important to set the PUBSUB_EMULATOR_HOST environment variable to point to your local emulator instance. This can be done within your application's settings file. For example:
PUBSUB_EMULATOR_HOST = "pubsub-emulator:8085"
This setup ensures that your application and tests communicate with the Pub/Sub emulator instead of Google Cloud Pub/Sub.
Publishing Messages to a Topic¶
To publish messages to a topic, you can use the following code snippet. This works for both the emulator and the actual Pub/Sub service in Google Cloud Platform (GCP):
import os
from django.conf import settings
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
class TopicPublisherClient:
def __init__(self, topic_name):
if hasattr(settings, "PUBSUB_EMULATOR_HOST"):
os.environ["PUBSUB_EMULATOR_HOST"] = settings.PUBSUB_EMULATOR_HOST
self.publisher = pubsub_v1.PublisherClient()
self.topic_path = self.publisher.topic_path(settings.PROJECT_ID, topic_name)
try:
self.publisher.get_topic(topic=self.topic_path)
except NotFound as notFoundException:
if hasattr(settings, "PUBSUB_EMULATOR_HOST"):
publisher = pubsub_v1.PublisherClient()
publisher.create_topic(name=self.topic_path)
else:
raise notFoundException
def publish(self, data: bytes, **kwargs):
self.publisher.publish(self.topic_path, data, **kwargs)