lily/topic

Ephemeral and stateful topics for server-to-client fan-out, similar to pub/sub patterns within other libraries.

An ephemeral topic broadcasts Push frames with no sequence and no replay:

let assert Ok(typing) = topic.new(server, id: "typing")
// Broadcast from anywhere:
topic.broadcast(typing, UserIsTyping(client_id))

Pipe through with_store to make it stateful, the topic actor reads its update logic from the server’s Wiring and sends TopicUpdate frames to every subscriber:

let assert Ok(chat) =
  topic.new(server, id: "chat")
  |> topic.with_store
  |> topic.with_on_subscribe(fn(client_id) {
    [Chat(UserJoined(client_id))]
  })

For dynamic topics keyed by a parsed identifier (e.g. "room:42"), use topic.kind to register a factory that creates topic actors on first subscribe:

// `auth.may_join_room` is your own helper; Lily does not ship auth.
let assert Ok(_) =
  topic.kind(
    server,
    prefix: "room:",
    parse_id: int.parse,
    configure: fn(room_id, topic) {
      topic
      |> topic.with_store
      |> topic.with_can_subscribe(fn(client_id, _topic_id) {
        auth.may_join_room(client_id, room_id)
      })
    },
  )

Types

Phantom kind marker for ephemeral topics (broadcast only, no store).

pub type Ephemeral

Phantom kind marker for stateful topics (store + sequence + snapshot).

pub type Stateful

Opaque handle to a running topic. The kind phantom parameter is Ephemeral after topic.new and Stateful after topic.with_store; this is enforced at compile time so topic.dispatch cannot be called on an ephemeral topic.

pub opaque type Topic(model, message, kind)

Values

pub fn broadcast(
  topic: Topic(model, message, kind),
  message: message,
) -> Nil

Send a Push frame to every subscriber of this topic. Available on both ephemeral and stateful topics.

topic.broadcast(typing_topic, UserIsTyping(client_id))
pub fn broadcast_from(
  topic: Topic(model, message, kind),
  except client_id: String,
  message message: message,
) -> Nil

Like broadcast but skips the originating client.

topic.broadcast_from(
  typing_topic,
  except: client_id,
  message: UserIsTyping(client_id),
)
pub fn dispatch(
  topic: Topic(model, message, Stateful),
  message: message,
) -> Nil

Apply a message to the topic’s store and emit TopicUpdate(id, seq, payload) to every subscriber. Only callable on stateful topics (created via with_store); ephemeral topics fail at compile time.

topic.dispatch(chat_topic, Chat(NewChatMessage(body)))
pub fn kind(
  server: server.Server(model, message),
  prefix prefix: String,
  parse_id parse_id: fn(String) -> Result(parsed, Nil),
  configure configure: fn(
    parsed,
    Topic(model, message, Ephemeral),
  ) -> Topic(model, message, kind),
) -> Result(Nil, Nil)

Register a parametric topic kind. When a client subscribes to prefix <> suffix and no fixed topic with that id exists, the server parses the suffix via parse_id and calls configure(parsed, topic) to configure a pre-started Topic lazily.

The pre-started topic is passed to configure. Call with_store, with_can_subscribe, etc. on it and return the result. Do not call topic.new inside configure; the topic actor is already started.

let assert Ok(_) =
  topic.kind(
    server,
    prefix: "room:",
    parse_id: int.parse,
    configure: fn(room_id, topic) {
      topic |> topic.with_store(...)
    },
  )
pub fn new(
  server: server.Server(model, message),
  id id: String,
) -> Result(Topic(model, message, Ephemeral), Nil)

Register a topic on the given server. Returns an ephemeral handle (broadcast-only) by default; pipe through with_store to make it stateful.

let assert Ok(typing) = topic.new(server, id: "typing")
pub fn stop(topic: Topic(model, message, kind)) -> Nil

Stop the topic actor and remove it from the server registry. All subscribers receive an Acknowledge(Topic(id), seq) frame so their client slices reset to initial. Further subscribes to this id either error (fixed topic) or trigger lazy reinstantiation (parametric kind, if registered).

topic.stop(chat_topic)
pub fn subscribe(
  topic: Topic(model, message, kind),
  client_id: String,
) -> Nil

Add a subscriber. Server-initiated; the client-side counterpart is client.subscribe. Idempotent.

topic.subscribe(chat_topic, client_id)
pub fn unsubscribe(
  topic: Topic(model, message, kind),
  client_id: String,
) -> Nil

Remove a subscriber. Idempotent.

topic.unsubscribe(chat_topic, client_id)
pub fn with_can_subscribe(
  topic: Topic(model, message, kind),
  predicate: fn(String, String) -> Bool,
) -> Topic(model, message, kind)

Set an authorisation predicate for client-initiated subscribes. Server-side topic.subscribe is unaffected (it’s trusted). On False, the server replies with Rejected(topic_id, "denied").

topic.with_can_subscribe(chat_topic, fn(client_id, _topic_id) {
  auth.is_authenticated(client_id)
})
pub fn with_on_subscribe(
  topic: Topic(model, message, kind),
  hook: fn(String) -> List(message),
) -> Topic(model, message, kind)

Set a join hook. Returned messages are broadcast (ephemeral topics) or dispatched (stateful topics) immediately after the new subscriber receives its Snapshot, so the joiner sees them too.

topic.with_on_subscribe(chat_topic, fn(client_id) {
  [Chat(UserJoined(client_id))]
})
pub fn with_on_unsubscribe(
  topic: Topic(model, message, kind),
  hook: fn(String) -> List(message),
) -> Topic(model, message, kind)

Set a leave hook. Symmetric to with_on_subscribe; fires after the subscriber is removed. Common pattern: check the remaining subscriber count and call topic.stop when empty for kind-instantiated topics.

topic.with_on_unsubscribe(chat_topic, fn(_client_id) { [] })
pub fn with_store(
  topic: Topic(model, message, Ephemeral),
) -> Topic(model, message, Stateful)

Upgrade an ephemeral topic to stateful by attaching a store. The update logic and initial state are read from the store.Wiring that was passed to server.new, specifically the store.topic(id: topic.id, ...) entry.

topic.new(server, id: "chat")
|> topic.with_store
Search Document