Skip to content

How to enable pub/sub for local development

If you're developing software that interacts with Pub/Sub topics, you'll likely want to verify the messages that have been published to these topics. For local development and testing purposes, you can achieve this by using an emulator. This setup allows you to create and delete topics and subscriptions, enabling a clean environment for each test. The following guide outlines how to set up Pub/Sub emulation for local development and continuous integration (CI) environments, such as GitLab.

Setting Up the Pub/Sub Emulator with Docker-Compose

To run the Pub/Sub emulator locally, you can use Docker Compose. Below is a docker-compose.yml example that defines a service for running the emulator:

services:
  pubsub-emulator:
    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
    command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
    ports:
      - "8085:8085"

This configuration will start the Pub/Sub emulator on port 8085 of your local machine.

Configuring GitLab CI to Use the Pub/Sub Emulator

When running tests in the GitLab CI environment, you can also utilize the Pub/Sub emulator. Add the following service configuration to your .gitlab-ci.yml file to include the Pub/Sub emulator in your CI pipeline:

.pubsub-emulator-service: &pubsub-emulator-service
  name: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
  alias: pubsub-emulator
  command: [ "gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085" ]
  entrypoint: [ "" ]


python:tox:
  services:
    - *pubsub-emulator-service

This configuration ensures that the Pub/Sub emulator is available during your CI pipeline execution.

Writing Test Code to Use the Emulator

The following Python code snippet demonstrates how to create and delete topics and subscriptions for testing purposes:

import os
import time
from google.cloud import pubsub_v1
from google.api_core.exceptions import NotFound
from django.conf import settings

def re_create_topic_and_subscription():
    os.environ["PUBSUB_EMULATOR_HOST"] = settings.PUBSUB_EMULATOR_HOST
    project = "None"
    topic_name = "topic_name"

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, topic_name)
    try:
        publisher.delete_topic(topic=topic_path)
    except NotFound:
        pass
    publisher.create_topic(name=topic_path)

    subscriber = pubsub_v1.SubscriberClient()
    subscription_name = f"{topic_name}_subscription"
    subscription_path = subscriber.subscription_path(project, subscription_name)
    try:
        subscriber.delete_subscription(subscription=subscription_path)
    except NotFound:
        pass
    subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path})
    return subscriber, subscription_path

def test_a():
    subscriber, subscription_path = re_create_topic_and_subscription()
    received_pubsub_messages = []

    def receive_pubsub_message(message):
        received_pubsub_messages.append(message)
        message.ack()

    subscriber.subscribe(subscription_path, callback=receive_pubsub_message)

    # Trigger action that publishes messages to the topic
    an_action_that_means_2_messages_are_put_on_the_topic()

    timeout_seconds = 20
    start_time = time.time()
    while time.time() - start_time < timeout_seconds:
        if len(received_pubsub_messages) >= 2:
            break
        time.sleep(0.5)
    assert len(received_pubsub_messages) == 2

Setting Environment Variables

It's important to set the PUBSUB_EMULATOR_HOST environment variable to point to your local emulator instance. This can be done within your application's settings file. For example:

PUBSUB_EMULATOR_HOST = "pubsub-emulator:8085"

This setup ensures that your application and tests communicate with the Pub/Sub emulator instead of Google Cloud Pub/Sub.

Publishing Messages to a Topic

To publish messages to a topic, you can use the following code snippet. This works for both the emulator and the actual Pub/Sub service in Google Cloud Platform (GCP):

import os

from django.conf import settings
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1


class TopicPublisherClient:
    def __init__(self, topic_name):
        if hasattr(settings, "PUBSUB_EMULATOR_HOST"):
            os.environ["PUBSUB_EMULATOR_HOST"] = settings.PUBSUB_EMULATOR_HOST
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(settings.PROJECT_ID, topic_name)
        try:
            self.publisher.get_topic(topic=self.topic_path)
        except NotFound as notFoundException:
            if hasattr(settings, "PUBSUB_EMULATOR_HOST"):
                publisher = pubsub_v1.PublisherClient()
                publisher.create_topic(name=self.topic_path)
            else:
                raise notFoundException

    def publish(self, data: bytes, **kwargs):
        self.publisher.publish(self.topic_path, data, **kwargs)