Ferricstore.PubSub (ferricstore v0.4.1)

Copy Markdown View Source

ETS-based Pub/Sub registry for FerricStore.

Provides a fire-and-forget, at-most-once messaging layer implemented entirely on the BEAM — no Raft consensus, no Bitcask persistence. Subscribers register their connection pid and receive messages as plain BEAM messages.

Architecture

Three ETS tables back the registry:

  • :ferricstore_pubsub{channel, pid} entries for exact channel subscriptions. Uses a :bag so multiple pids can subscribe to the same channel while duplicate subscriptions from the same pid remain collapsed.

  • :ferricstore_pubsub_channel_cache{channel, [pid]} entries derived from :ferricstore_pubsub. PUBLISH reads this table so the hot path avoids copying and reducing {channel, pid} tuples on every exact publish.

  • :ferricstore_pubsub_patterns{pattern, pid, matcher} entries for glob-pattern subscriptions (PSUBSCRIBE). Also a :bag.

The tables are owned by a GenServer (Ferricstore.PubSub) so they survive the lifetime of the application and are cleaned up on shutdown.

Subscriber pids are monitored once by the owner process. If a connection dies before running its normal cleanup path, the monitor removes its channel and pattern entries so publish counts and PUBSUB introspection do not retain stale subscribers.

Message protocol

When a message is published to a channel, each matching subscriber pid receives one of:

  • {:pubsub_message, channel, message} — for exact channel subscriptions
  • {:pubsub_pmessage, pattern, channel, message} — for pattern subscriptions

The connection process is responsible for encoding these into RESP3 push frames.

Summary

Functions

Lists active channels (channels with at least one subscriber).

Returns a specification to start this module under a supervisor.

Removes all subscriptions (channels and patterns) for the given pid.

Returns the total number of active pattern subscriptions.

Returns subscriber counts for the given channels.

Subscribes pid to all channels matching pattern (glob syntax).

Subscribes pid to all given glob patterns with one monitor operation.

Publishes message to all subscribers of channel.

Unsubscribes pid from the given glob pattern.

Unsubscribes pid from all given glob patterns and checks the monitor once.

Starts the PubSub registry GenServer.

Subscribes pid to the given channel.

Subscribes pid to all given channels with one monitor operation.

Unsubscribes pid from the given channel.

Unsubscribes pid from all given channels and checks the monitor once.

Types

channel()

@type channel() :: binary()

pattern()

@type pattern() :: binary()

Functions

channels(pattern \\ nil)

@spec channels(pattern() | nil) :: [channel()]

Lists active channels (channels with at least one subscriber).

When pattern is nil, returns all channels. When a glob pattern is given, returns only channels whose name matches.

Parameters

  • pattern - Optional glob pattern to filter channels (default: nil).

Returns

A list of channel name binaries.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

cleanup(pid)

@spec cleanup(pid()) :: :ok

Removes all subscriptions (channels and patterns) for the given pid.

Called during connection cleanup when a client disconnects to prevent stale entries in the ETS tables.

Parameters

  • pid - The process id to clean up.

Returns

:ok

numpat()

@spec numpat() :: non_neg_integer()

Returns the total number of active pattern subscriptions.

Returns

A non-negative integer.

numsub(channel_list)

@spec numsub([channel()]) :: [channel() | non_neg_integer()]

Returns subscriber counts for the given channels.

Returns a flat list of [channel, count, channel, count, ...] suitable for RESP encoding.

Parameters

  • channel_list - List of channel names.

Returns

A flat list alternating channel names and their subscriber counts.

psubscribe(pattern, pid)

@spec psubscribe(pattern(), pid()) :: :ok

Subscribes pid to all channels matching pattern (glob syntax).

The raw glob pattern is stored and evaluated with Ferricstore.GlobMatcher at publish time, so PubSub uses the same Redis pattern semantics as SCAN.

Parameters

  • pattern - A glob pattern (e.g. "news.*", "user:?").
  • pid - The subscriber process id.

Returns

:ok

psubscribe_many(patterns, pid)

@spec psubscribe_many([pattern()], pid()) :: :ok

Subscribes pid to all given glob patterns with one monitor operation.

publish(channel, message)

@spec publish(channel(), binary()) :: non_neg_integer()

Publishes message to all subscribers of channel.

Looks up exact channel subscribers and pattern subscribers whose glob pattern matches the channel name. Sends a BEAM message to each matching pid.

Parameters

  • channel - The channel to publish to (binary).
  • message - The message payload (binary).

Returns

The number of subscribers that received the message (integer).

punsubscribe(pattern, pid)

@spec punsubscribe(pattern(), pid()) :: :ok

Unsubscribes pid from the given glob pattern.

Removes all entries matching {pattern, pid, _} from the patterns table.

Parameters

  • pattern - The glob pattern (binary).
  • pid - The subscriber process id.

Returns

:ok

punsubscribe_many(patterns, pid)

@spec punsubscribe_many([pattern()], pid()) :: :ok

Unsubscribes pid from all given glob patterns and checks the monitor once.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the PubSub registry GenServer.

Creates the ETS tables :ferricstore_pubsub and :ferricstore_pubsub_patterns. Should be added to the application supervision tree before the Ranch listener.

subscribe(channel, pid)

@spec subscribe(channel(), pid()) :: :ok

Subscribes pid to the given channel.

The subscription is idempotent — calling it twice with the same pid and channel keeps a single registry entry.

Parameters

  • channel - The channel name (binary).
  • pid - The subscriber process id.

Returns

:ok

subscribe_many(channels, pid)

@spec subscribe_many([channel()], pid()) :: :ok

Subscribes pid to all given channels with one monitor operation.

Duplicate {channel, pid} entries are still collapsed by the ETS :bag table, matching subscribe/2 semantics.

unsubscribe(channel, pid)

@spec unsubscribe(channel(), pid()) :: :ok

Unsubscribes pid from the given channel.

Removes the {channel, pid} entry from the ETS table. If the pid was not subscribed, this is a no-op.

Parameters

  • channel - The channel name (binary).
  • pid - The subscriber process id.

Returns

:ok

unsubscribe_many(channels, pid)

@spec unsubscribe_many([channel()], pid()) :: :ok

Unsubscribes pid from all given channels and checks the monitor once.