lily/topic
Ephemeral and stateful topics for server-to-client fan-out, similar to pub/sub patterns within other libraries.
An ephemeral topic broadcasts Push frames with no sequence and no replay:
let assert Ok(typing) = topic.new(server, id: "typing")
// Broadcast from anywhere:
topic.broadcast(typing, UserIsTyping(client_id))
Pipe through with_store to make it stateful, the topic actor reads its
update logic from the server’s Wiring and sends TopicUpdate frames to
every subscriber:
let assert Ok(chat) =
topic.new(server, id: "chat")
|> topic.with_store
|> topic.with_on_subscribe(fn(client_id) {
[Chat(UserJoined(client_id))]
})
For dynamic topics keyed by a parsed identifier (e.g. "room:42"), use
topic.kind to register a factory that creates topic actors on first
subscribe:
// `auth.may_join_room` is your own helper; Lily does not ship auth.
let assert Ok(_) =
topic.kind(
server,
prefix: "room:",
parse_id: int.parse,
configure: fn(room_id, topic) {
topic
|> topic.with_store
|> topic.with_can_subscribe(fn(client_id, _topic_id) {
auth.may_join_room(client_id, room_id)
})
},
)
Types
Phantom kind marker for ephemeral topics (broadcast only, no store).
pub type Ephemeral
Phantom kind marker for stateful topics (store + sequence + snapshot).
pub type Stateful
Values
pub fn broadcast(
topic: Topic(model, message, kind),
message: message,
) -> Nil
Send a Push frame to every subscriber of this topic. Available on both
ephemeral and stateful topics.
topic.broadcast(typing_topic, UserIsTyping(client_id))
pub fn broadcast_from(
topic: Topic(model, message, kind),
except client_id: String,
message message: message,
) -> Nil
Like broadcast but skips the originating client.
topic.broadcast_from(
typing_topic,
except: client_id,
message: UserIsTyping(client_id),
)
pub fn dispatch(
topic: Topic(model, message, Stateful),
message: message,
) -> Nil
Apply a message to the topic’s store and emit
TopicUpdate(id, seq, payload) to every subscriber. Only callable on
stateful topics (created via with_store); ephemeral topics fail at
compile time.
topic.dispatch(chat_topic, Chat(NewChatMessage(body)))
pub fn kind(
server: server.Server(model, message),
prefix prefix: String,
parse_id parse_id: fn(String) -> Result(parsed, Nil),
configure configure: fn(
parsed,
Topic(model, message, Ephemeral),
) -> Topic(model, message, kind),
) -> Result(Nil, Nil)
Register a parametric topic kind. When a client subscribes to
prefix <> suffix and no fixed topic with that id exists, the server
parses the suffix via parse_id and calls configure(parsed, topic) to
configure a pre-started Topic lazily.
The pre-started topic is passed to configure. Call with_store,
with_can_subscribe, etc. on it and return the result. Do not call
topic.new inside configure; the topic actor is already started.
let assert Ok(_) =
topic.kind(
server,
prefix: "room:",
parse_id: int.parse,
configure: fn(room_id, topic) {
topic |> topic.with_store(...)
},
)
pub fn new(
server: server.Server(model, message),
id id: String,
) -> Result(Topic(model, message, Ephemeral), Nil)
Register a topic on the given server. Returns an ephemeral handle
(broadcast-only) by default; pipe through with_store to make it
stateful.
let assert Ok(typing) = topic.new(server, id: "typing")
pub fn stop(topic: Topic(model, message, kind)) -> Nil
Stop the topic actor and remove it from the server registry. All
subscribers receive an Acknowledge(Topic(id), seq) frame so their client
slices reset to initial. Further subscribes to this id either error (fixed
topic) or trigger lazy reinstantiation (parametric kind, if registered).
topic.stop(chat_topic)
pub fn subscribe(
topic: Topic(model, message, kind),
client_id: String,
) -> Nil
Add a subscriber. Server-initiated; the client-side counterpart is
client.subscribe. Idempotent.
topic.subscribe(chat_topic, client_id)
pub fn unsubscribe(
topic: Topic(model, message, kind),
client_id: String,
) -> Nil
Remove a subscriber. Idempotent.
topic.unsubscribe(chat_topic, client_id)
pub fn with_can_subscribe(
topic: Topic(model, message, kind),
predicate: fn(String, String) -> Bool,
) -> Topic(model, message, kind)
Set an authorisation predicate for client-initiated subscribes.
Server-side topic.subscribe is unaffected (it’s trusted). On False,
the server replies with Rejected(topic_id, "denied").
topic.with_can_subscribe(chat_topic, fn(client_id, _topic_id) {
auth.is_authenticated(client_id)
})
pub fn with_on_subscribe(
topic: Topic(model, message, kind),
hook: fn(String) -> List(message),
) -> Topic(model, message, kind)
Set a join hook. Returned messages are broadcast (ephemeral topics) or
dispatched (stateful topics) immediately after the new subscriber receives
its Snapshot, so the joiner sees them too.
topic.with_on_subscribe(chat_topic, fn(client_id) {
[Chat(UserJoined(client_id))]
})
pub fn with_on_unsubscribe(
topic: Topic(model, message, kind),
hook: fn(String) -> List(message),
) -> Topic(model, message, kind)
Set a leave hook. Symmetric to with_on_subscribe; fires after the
subscriber is removed. Common pattern: check the remaining subscriber count
and call topic.stop when empty for kind-instantiated topics.
topic.with_on_unsubscribe(chat_topic, fn(_client_id) { [] })
pub fn with_store(
topic: Topic(model, message, Ephemeral),
) -> Topic(model, message, Stateful)
Upgrade an ephemeral topic to stateful by attaching a store. The update
logic and initial state are read from the store.Wiring that was passed
to server.new, specifically the store.topic(id: topic.id, ...) entry.
topic.new(server, id: "chat")
|> topic.with_store