Back to Posts
Smartphone displaying YouTube logo next to a laptop keyboard in a dark setting, illustrating an example of the publish-subscribe pattern in Python for real-time notifications and updates.

How to Use Publish-Subscribe Pattern in Python

Many of us eagerly wait for that notification, “Your favorite YouTuber has uploaded a new video!” Have you ever wondered how this notification system works behind the scenes? How does clicking that subscribe button register you for alerts? And what design pattern powers this kind of behavior?

Let’s delve into the PubSub design pattern, a powerful architecture that ensures you get relevant updates seamlessly!

The publish-subscribe (Pub-Sub) pattern

The publish-subscribe pattern is a messaging architecture where publishers do not directly send messages to subscribers. Instead, messages are published to channels without the need for the publisher to know which subscribers (if any) will receive them. Subscribers indicate interest in specific channels and receive only the messages that are relevant to them, promoting a clear separation of concerns.

This pattern allows for a decoupling of message producers (publishers) and message consumers (subscribers). This decoupling enhances flexibility and scalability, making it easier to add new subscribers or change the way messages are handled without affecting the publishers.

Components of the PubSub pattern

  1. Subscriber protocol:
class Subscriber[Message](Protocol):
    def __call__(self, message: Message) -> None:
        ...

This protocol defines the interface for a subscriber. Any subscriber must implement the __call__ method, which takes a message as a parameter. By defining the protocol this way, we support the use of both class-based and function-based subscribers. The Subscriber protocol ensures that any object conforming to it can be used interchangeably.

Note Subscribers can accept any type of message you define. It’s quite common for this to be JSON when working with web applications, as it allows us to send structured messages in a cross-platform way. This is why we have a generic annotation for Message, as this allows us to provide an implementation that is type-agnostic. For more generic information, check out my post here.

  1. Channel class:
@dataclass(slots=True, repr=False, kw_only=True)
class Channel[Message]:
    subscribers: set[Subscriber[Message]] = field(default_factory=set)

    def subscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.add(subscriber)

    def unsubscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.remove(subscriber)

    def publish(self, message: str) -> None:
        for subscriber in self.subscribers:
            subscriber(message)

The Channel class represents a communication channel. It maintains a set of subscribers and provides methods to subscribe, unsubscribe, and publish messages to these subscribers. Using a set ensures that each subscriber is unique and avoids duplicates.

  • subscribe: Adds a subscriber to the channel.
  • unsubscribe: Removes a subscriber from the channel.
  • publish: Sends a message to all subscribers in the channel.
  1. Publisher class:
@dataclass(slots=True)
class Publisher[Message]:
    channels: dict[str, Channel[Message]] = field(default_factory=lambda: defaultdict(Channel))

    def publish(self, channel_name: str, message: Message) -> None:
        self.channels[channel_name].publish(message)

    def publish_all(self, message: Message) -> None:
        for channel in self.channels.values():
            channel.publish(message)

    def subscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].subscribe(subscriber)

    def subscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.subscribe(subscriber)

    def unsubscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].unsubscribe(subscriber)

    def unsubscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.unsubscribe(subscriber)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.channels})"

The Publisher class manages multiple channels. It provides methods to get or create channels, subscribe/unsubscribe subscribers to/from channels, publish messages to a specific channel, or broadcast messages to all channels.

  • publish: Publishes a message to a specific channel.
  • publish_all: Publishes a message to all channels.
  • subscribe: Subscribes a subscriber to a specific channel.
  • subscribe_all: Subscribes a subscriber to all channels
  • unsubscribe: Unsubscribes a subscriber from a specific channel.
  • unsubscribe_all: Unsubscribes a subscriber from all channels

Putting it together

from collections import defaultdict
from typing import Protocol
from dataclasses import field, dataclass


class Subscriber[Message](Protocol):
    def __call__(self, message: Message) -> None:
        ...


@dataclass(slots=True, repr=False, kw_only=True)
class Channel[Message]:
    subscribers: set[Subscriber[Message]] = field(default_factory=set)

    def subscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.add(subscriber)

    def unsubscribe(self, subscriber: Subscriber[Message]) -> None:
        self.subscribers.remove(subscriber)

    def publish(self, message: str) -> None:
        for subscriber in self.subscribers:
            subscriber(message)


@dataclass(slots=True)
class Publisher[Message]:
    channels: dict[str, Channel[Message]] = field(default_factory=lambda: defaultdict(Channel))

    def publish(self, channel_name: str, message: Message) -> None:
        self.channels[channel_name].publish(message)

    def publish_all(self, message: Message) -> None:
        for channel in self.channels.values():
            channel.publish(message)

    def subscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].subscribe(subscriber)

    def subscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.subscribe(subscriber)

    def unsubscribe(self, channel_name: str, subscriber: Subscriber) -> None:
        self.channels[channel_name].unsubscribe(subscriber)

    def unsubscribe_all(self, subscriber: Subscriber) -> None:
        for channel in self.channels.values():
            channel.unsubscribe(subscriber)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.channels})"

Usage example

Here’s an example of how to use the publisher and channel classes to manage subscriptions and publish messages:

class EmailSubscriber:
    def __init__(self, email: str):
        self.email = email

    def __call__(self, message: str):
        print(f"Sending email to {self.email}: {message}")

def main() -> None:
    publisher = Publisher()
    email_subscriber = EmailSubscriber('[email protected]')

    spam = publisher.channels["spam"]
    eggs = publisher.channels["eggs"]

    # Subscribing to channels
    spam.subscribe(email_subscriber)
    eggs.subscribe(email_subscriber)

    # Publishing messages
    spam.publish('Hello, spam subscribers!')
    eggs.publish('Hello, eggs subscribers!')

    # Unsubscribe
    spam.unsubscribe(email_subscriber)

    # Publishing after unsubscription
    spam.publish('Hello again, spam subscribers!')
    eggs.publish('Hello again, spam subscribers!')

if __name__ == '__main__':
    main()

Output:

Sending email to [email protected]: Hello, spam subscribers!
Sending email to [email protected]: Hello, eggs subscribers!
Sending email to [email protected]: Hello again, spam subscribers!

In this example:

  • The email_subscriber is subscribed to both channels and receives messages published to them.
  • After unsubscribing from the spam channel, messages published to spam are no longer received by email_subscriber.

Final thoughts

This pattern allows for flexible and scalable message distribution systems. Subscribers can represent various entities, such as individual users, devices, or even pools of objects. By separating concerns, we reduce coupling, allowing for more flexibility and reusability.

Improve your code with my 3-part code diagnosis framework

Watch my free 30 minutes code diagnosis workshop on how to quickly detect problems in your code and review your code more effectively.

When you sign up, you'll get an email from me regularly with additional free content. You can unsubscribe at any time.

Recent posts