lily/pub_sub

A pubsub (written like this only in the module docs, the proper name is pub_sub to reflect the PubSub type name) for server-pushed and ephemeral messages that do not get stored or sequenced in the same way as messages within store.Store.

Very similar to Phoenix PubSub, this one is built around a transport.Push frame that carries no sequence number and is never replayed on resync.

Use a pubsub when you need to broadcast information that isn’t part of the authoritative store, like presence events. It allows for communication between separate stores (or other server-actions). That said, everything that must be synchronised across clients should still goes through lily/server, as pubsub is slightly less reliable since it’s ephemeral.

import lily/pub_sub
import lily/server
import lily/transport

pub fn main() {
  let assert Ok(srv) = server.start(store:, serialiser: my_ser)
  let assert Ok(bus) = pub_sub.new()

  // Per-connection wiring (typically in your WebSocket handler):
  let client_id = server.generate_client_id()
  let send = fn(bytes) { ws.send(bytes) }
  server.connect(srv, client_id:, send:)
  pub_sub.register(bus, client_id:, send:)

  // Subscribe from anywhere — a message hook, an HTTP handler, etc.:
  pub_sub.subscribe(bus, client_id:, topic: "room:general")

  // Broadcast from anywhere — a background job, a webhook, etc.:
  pub_sub.broadcast(bus,
    topic: "room:general",
    message: NewChatMessage("hi"),
    serialiser: my_serialiser,
  )
}

PubSub instances and servers are independent — an app can use one, the other, or both. On Erlang a pubsub is backed by an OTP actor; on JS, by closure-scoped state. Both targets expose the same API.

Types

A handle to a running pubsub instance.

The wrap around PubSubHandle exists so the public API can have one uniform shape across targets.

Compare with client.Runtime, whose wrap is load-bearing for a different reason: the JS FFI handle has no type parameters, so the wrap is the only place to attach the (model, message) phantom params. Here both targets already carry the message parameter; the wrap is purely about a uniform, encapsulated surface.

pub opaque type PubSub(message)

Values

pub fn broadcast(
  pub_sub: PubSub(message),
  topic topic: String,
  message message: message,
  serialiser serialiser: transport.Serialiser(model, message),
) -> Nil

Broadcast message to every client subscribed to topic. The message is encoded as a transport.Push frame before being handed to each subscriber’s send callback.

pub fn broadcast_from(
  pub_sub: PubSub(message),
  from from_id: String,
  topic topic: String,
  message message: message,
  serialiser serialiser: transport.Serialiser(model, message),
) -> Nil

Broadcast message to every client subscribed to topic except the given from client. Useful in message hooks where the originating client already knows what it sent and doesn’t need the echo.

pub fn new() -> Result(PubSub(message), Nil)

Start a new pubsub instance. Returns Error(Nil) only when the Erlang actor fails to start (rare). Always returns Ok on JavaScript.

pub fn register(
  pub_sub: PubSub(message),
  client_id client_id: String,
  send send: fn(BitArray) -> Nil,
) -> Nil

Register a client’s send callback. Call alongside server.connect when a client connects.

pub fn subscribe(
  pub_sub: PubSub(message),
  client_id client_id: String,
  topic topic: String,
) -> Nil

Subscribe a registered client to topic. Subsequent broadcasts on that topic will deliver to this client.

pub fn unregister(
  pub_sub: PubSub(message),
  client_id client_id: String,
) -> Nil

Remove a client and automatically unsubscribe it from every topic it was in. Call alongside server.disconnect when a client disconnects.

pub fn unsubscribe(
  pub_sub: PubSub(message),
  client_id client_id: String,
  topic topic: String,
) -> Nil

Unsubscribe a client from topic. No-op if the client was not subscribed.

Search Document