Ferricstore.PubSub (ferricstore v0.3.1)

Copy Markdown View Source

ETS-based Pub/Sub registry for FerricStore.

Provides a fire-and-forget, at-most-once messaging layer implemented entirely on the BEAM — no Raft consensus, no Bitcask persistence. Subscribers register their connection pid and receive messages as plain BEAM messages.

Architecture

Two ETS tables back the registry:

  • :ferricstore_pubsub{channel, pid} entries for exact channel subscriptions. Uses a :duplicate_bag so multiple pids can subscribe to the same channel.

  • :ferricstore_pubsub_patterns{pattern, pid, compiled_regex} entries for glob-pattern subscriptions (PSUBSCRIBE). Also a :duplicate_bag.

Both tables are owned by a GenServer (Ferricstore.PubSub) so they survive the lifetime of the application and are cleaned up on shutdown.

Message protocol

When a message is published to a channel, each matching subscriber pid receives one of:

  • {:pubsub_message, channel, message} — for exact channel subscriptions
  • {:pubsub_pmessage, pattern, channel, message} — for pattern subscriptions

The connection process is responsible for encoding these into RESP3 push frames.

Summary

Functions

Lists active channels (channels with at least one subscriber).

Returns a specification to start this module under a supervisor.

Removes all subscriptions (channels and patterns) for the given pid.

Returns the total number of active pattern subscriptions.

Returns subscriber counts for the given channels.

Subscribes pid to all channels matching pattern (glob syntax).

Publishes message to all subscribers of channel.

Unsubscribes pid from the given glob pattern.

Starts the PubSub registry GenServer.

Subscribes pid to the given channel.

Unsubscribes pid from the given channel.

Types

channel()

@type channel() :: binary()

pattern()

@type pattern() :: binary()

Functions

channels(pattern \\ nil)

@spec channels(pattern() | nil) :: [channel()]

Lists active channels (channels with at least one subscriber).

When pattern is nil, returns all channels. When a glob pattern is given, returns only channels whose name matches.

Parameters

  • pattern - Optional glob pattern to filter channels (default: nil).

Returns

A list of channel name binaries.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

cleanup(pid)

@spec cleanup(pid()) :: :ok

Removes all subscriptions (channels and patterns) for the given pid.

Called during connection cleanup when a client disconnects to prevent stale entries in the ETS tables.

Parameters

  • pid - The process id to clean up.

Returns

:ok

numpat()

@spec numpat() :: non_neg_integer()

Returns the total number of active pattern subscriptions.

Returns

A non-negative integer.

numsub(channel_list)

@spec numsub([channel()]) :: [channel() | non_neg_integer()]

Returns subscriber counts for the given channels.

Returns a flat list of [channel, count, channel, count, ...] suitable for RESP encoding.

Parameters

  • channel_list - List of channel names.

Returns

A flat list alternating channel names and their subscriber counts.

psubscribe(pattern, pid)

@spec psubscribe(pattern(), pid()) :: :ok

Subscribes pid to all channels matching pattern (glob syntax).

The glob pattern is compiled to a regex and stored alongside the entry for efficient matching at publish time.

Parameters

  • pattern - A glob pattern (e.g. "news.*", "user:?").
  • pid - The subscriber process id.

Returns

:ok

publish(channel, message)

@spec publish(channel(), binary()) :: non_neg_integer()

Publishes message to all subscribers of channel.

Looks up exact channel subscribers and pattern subscribers whose compiled regex matches the channel name. Sends a BEAM message to each matching pid.

Parameters

  • channel - The channel to publish to (binary).
  • message - The message payload (binary).

Returns

The number of subscribers that received the message (integer).

punsubscribe(pattern, pid)

@spec punsubscribe(pattern(), pid()) :: :ok

Unsubscribes pid from the given glob pattern.

Removes all entries matching {pattern, pid, _} from the patterns table.

Parameters

  • pattern - The glob pattern (binary).
  • pid - The subscriber process id.

Returns

:ok

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the PubSub registry GenServer.

Creates the ETS tables :ferricstore_pubsub and :ferricstore_pubsub_patterns. Should be added to the application supervision tree before the Ranch listener.

subscribe(channel, pid)

@spec subscribe(channel(), pid()) :: :ok

Subscribes pid to the given channel.

The subscription is idempotent — calling it twice with the same pid and channel will create a duplicate ETS entry, so callers should track their own subscriptions to avoid double-subscribing.

Parameters

  • channel - The channel name (binary).
  • pid - The subscriber process id.

Returns

:ok

unsubscribe(channel, pid)

@spec unsubscribe(channel(), pid()) :: :ok

Unsubscribes pid from the given channel.

Removes the {channel, pid} entry from the ETS table. If the pid was not subscribed, this is a no-op.

Parameters

  • channel - The channel name (binary).
  • pid - The subscriber process id.

Returns

:ok