eparch/event_manager
A type-safe, OTP-compatible event manager implementation that leverages Erlang’s gen_event behaviour through the Gleam FFI.
Overview
An event manager is a process that hosts any number of independent
handlers. Handlers are attached and detached at runtime. When you call
notify or sync_notify, every currently-registered handler receives the
event.
Example
import eparch/event_manager
import gleam/erlang/process
type MyEvent { LogLine(String) | Flush(process.Subject(Nil)) }
case event_manager.start_link(event_manager.new_start_options()) {
Ok(manager) -> {
let handler =
event_manager.new_handler(0, fn(event, count) {
case event {
LogLine(_) -> event_manager.Continue(count + 1)
Flush(reply) -> {
process.send(reply, Nil)
event_manager.Continue(count)
}
}
})
let _ = event_manager.add_handler(manager, handler)
event_manager.notify(manager, LogLine("hello"))
event_manager.sync_notify(manager, LogLine("world"))
}
Error(_) -> Nil
}
Types
Errors that can occur when adding a handler.
pub type AddError(request, reply) {
HandlerAlreadyExists(handler_ref: HandlerRef(request, reply))
InitFailed(reason: String)
}
Constructors
-
HandlerAlreadyExists(handler_ref: HandlerRef(request, reply))A handler with the same identity is already registered. The field carries the
HandlerRefthat caused the collision. -
InitFailed(reason: String)The handler’s initialisation failed.
reasonis a human-readable string produced from the raw Erlang error term.
The result of a non-blocking check_response call.
pub type CheckResponse(reply) {
CheckGotReply(reply: reply)
CheckNoReply
CheckCrashed(reason: StopReason)
}
Constructors
-
CheckGotReply(reply: reply)A reply for the given
RequestIdwas found in the mailbox. -
CheckNoReplyThe message was not a reply for this
RequestId. -
CheckCrashed(reason: StopReason)The manager (or handler) exited before replying.
The result of waiting for, or checking for, a response from a
RequestIdCollection.
pub type CollectionResponse(reply, label) {
GotReply(
reply: reply,
label: label,
remaining: RequestIdCollection(label, reply),
)
RequestFailed(
reason: StopReason,
label: label,
remaining: RequestIdCollection(label, reply),
)
CollectionTimeout(remaining: RequestIdCollection(label, reply))
NoReply(remaining: RequestIdCollection(label, reply))
NoRequests
}
Constructors
-
GotReply( reply: reply, label: label, remaining: RequestIdCollection(label, reply), )A successful reply was received for one of the pending requests.
-
RequestFailed( reason: StopReason, label: label, remaining: RequestIdCollection(label, reply), )One of the requests returned an error (e.g. the handler crashed).
-
CollectionTimeout(remaining: RequestIdCollection(label, reply))receive_response_collection/wait_response_collectiononly: the timeout elapsed before any pending reply arrived. The collection is unchanged. -
NoReply(remaining: RequestIdCollection(label, reply))check_response_collectiononly: the supplied message was not a reply for any request in this collection. The collection is unchanged. -
NoRequestsThe collection had no pending requests.
The result of a handler processing an event.
Return Continue(new_state) to keep the handler alive with updated state,
or Remove to unregister the handler from the manager.
pub type EventStep(state) {
Continue(state: state)
Remove
}
Constructors
-
Continue(state: state)Keep the handler alive and update its state.
-
RemoveRemove this handler from the event manager.
A builder for configuring a handler before registering it with a manager.
Create one with new_handler/2 and optionally extend it with
on_terminate/2 and on_format_status/2.
pub opaque type Handler(state, event, request, reply)
An opaque reference to a specific registered handler instance.
Values of this type are only ever produced by add_handler or
add_supervised_handler. Pass them to remove_handler to unregister a
specific handler, or compare them with values returned by which_handlers.
The phantom request/reply parameters track the call protocol of the
handler, so send_request cannot be called with a mismatched request type.
Handlers without a call protocol carry Nil/Nil.
pub type HandlerRef(request, reply)
An opaque reference to a running event manager process.
Values of this type are produced by start_link and start (directly)
and by start_monitor (as the manager field of the returned
MonitoredManager). Pass them to notify, sync_notify, add_handler,
etc.
pub type Manager(event)
Data returned when a manager is started with start_monitor.
pub type MonitoredManager(event) {
MonitoredManager(
manager: Manager(event),
monitor: process.Monitor,
)
}
Constructors
-
MonitoredManager( manager: Manager(event), monitor: process.Monitor, )
Errors that receive_response and wait_response can return.
pub type ReceiveError {
ReceiveTimeout
RequestCrashed(reason: StopReason)
}
Constructors
-
ReceiveTimeoutNo reply arrived within the timeout.
-
RequestCrashed(reason: StopReason)The manager (or handler) exited before replying.
Errors that can occur when removing a handler.
pub type RemoveError(request, reply) {
HandlerNotFound(handler_ref: HandlerRef(request, reply))
RemoveFailed(reason: String)
}
Constructors
-
HandlerNotFound(handler_ref: HandlerRef(request, reply))No handler with the given ref is currently registered. The field carries the
HandlerRefthe caller supplied. -
RemoveFailed(reason: String)Removal failed for another reason.
An opaque handle for a pending async call issued by send_request.
The phantom type reply tracks the expected response type at compile time.
pub type RequestId(reply)
A collection of in-flight request IDs, each associated with a label.
Used with send_request_to_collection, request_ids_add, and
receive_response_collection to manage multiple concurrent requests.
Requires Erlang/OTP 25.0 or later.
pub type RequestIdCollection(label, reply)
Whether receive_response_collection (and friends) removes the matched
request from the returned collection after delivering the reply.
pub type ResponseHandling {
Delete
Keep
}
Constructors
-
Delete -
Keep
A name under which an event manager can be registered.
Mirrors gen_event:emgr_name/0:
Local(name)registers locally witherlang:register/2and gives back aprocess.Name(event)you can turn into aSubject(event).Global(name)registers across nodes via theglobalmodule.Via(module, name)dispatches registration through any module implementing theviaregistry behaviour (e.g.gproc,syn).
pub type ServerName(event) {
Local(name: process.Name(event))
Global(name: atom.Atom)
Via(module: atom.Atom, name: dynamic.Dynamic)
}
Constructors
-
Local(name: process.Name(event)) -
Global(name: atom.Atom) -
Via(module: atom.Atom, name: dynamic.Dynamic)
Errors that can occur when starting an event manager.
pub type StartError {
AlreadyStarted(pid: process.Pid)
StartFailed(reason: String)
}
Constructors
-
AlreadyStarted(pid: process.Pid)A manager with the requested registered name is already running. The field carries the Pid of the already-running manager so callers can reuse it or diagnose the conflict.
-
StartFailed(reason: String)Startup failed for another reason.
reasonis a human-readable string produced from the raw Erlang error term.
Options for start_link, start, and start_monitor. Build a value with
new_start_options() and extend it using the with_* setter functions.
pub opaque type StartOptions(event)
Termination reason classified by the FFI.
Exit wraps a recognised process.ExitReason. RawReason is a fallback
for shapes the FFI cannot classify (e.g. {shutdown, _}, {noproc, _},
arbitrary user terms) so callers can still inspect the original term.
pub type StopReason {
Exit(reason: process.ExitReason)
RawReason(term: dynamic.Dynamic)
}
Constructors
-
Exit(reason: process.ExitReason) -
RawReason(term: dynamic.Dynamic)
Errors that can occur when swapping handlers.
Note: when the supplied old HandlerRef is not currently registered, the
swap still succeeds and the new handler is installed. The new handler’s
on_swap_in callback receives a SwapTerm wrapping the Erlang term
{error, module_not_found} so it can detect this case if needed.
pub type SwapError {
NewHandlerInitFailed(reason: String)
}
Constructors
-
NewHandlerInitFailed(reason: String)The new handler’s init failed (e.g. its
on_swap_inraised) orgen_event:swap_handler/3returned an error term.reasonis a human-readable string produced from the raw Erlang error term.
An opaque carrier for state handed from a swapped-out handler to its replacement.
Produced inside on_swap_out via swap_term_from and consumed inside
on_swap_in via swap_term_decode. Because the producer and consumer
of a swap have independent state types, the carrier is intentionally
untyped on the Gleam side — decode it with the gleam/dynamic/decode
module.
pub type SwapTerm
Values
pub fn add_handler(
manager: Manager(event),
handler: Handler(state, event, request, reply),
) -> Result(HandlerRef(request, reply), AddError(request, reply))
Register an unsupervised handler with the event manager.
Returns Ok(HandlerRef) on success. The returned ref uniquely identifies
this handler instance and can be used with remove_handler.
If the handler crashes, the manager removes it silently without notifying
the caller. For crash notifications use add_supervised_handler.
Example
case event_manager.add_handler(manager, my_handler) {
Ok(handler_ref) -> // use handler_ref later with remove_handler
Error(_) -> Nil
}
pub fn add_supervised_handler(
manager: Manager(event),
handler: Handler(state, event, request, reply),
) -> Result(HandlerRef(request, reply), AddError(request, reply))
Register a supervised handler with the event manager.
Like add_handler, but links the handler to the calling process. If the
handler is removed for any reason other than a normal remove_handler call
(e.g. it crashes or returns Remove), the calling process receives an
Erlang message shaped like:
{gen_event_EXIT, HandlerRef, Reason}
Receive it via gleam/erlang/process selectors — process.select_other is
the catch-all hook you can use to observe raw Erlang messages.
pub fn check_response(
message: dynamic.Dynamic,
request_id: RequestId(reply),
) -> CheckResponse(reply)
Non-blocking check: inspect message to see if it is a reply for
request_id.
Useful inside a custom process.Selector receive loop. Pass any message
you receive; if it does not belong to this request CheckNoReply is
returned and the message is left for other handlers.
Example
case event_manager.check_response(raw_msg, req) {
event_manager.CheckGotReply(value) -> // handle value
event_manager.CheckNoReply -> // not ours, pass it on
event_manager.CheckCrashed(reason) -> // handle error
}
pub fn check_response_collection(
message: dynamic.Dynamic,
collection: RequestIdCollection(label, reply),
handling: ResponseHandling,
) -> CollectionResponse(reply, label)
Non-blocking check: inspect message to see whether it is a reply for any
request in collection.
Returns GotReply(...) / RequestFailed(...) if the message belongs to
the collection, NoReply(remaining) if it is unrelated, or NoRequests
if the collection is empty. Never blocks.
Example
case event_manager.check_response_collection(raw_msg, coll, event_manager.Delete) {
event_manager.GotReply(value, label, rest) -> // handle and continue
event_manager.NoReply(_) -> // not ours, pass it on
_ -> // ...
}
Requires Erlang/OTP 25.0 or later.
pub fn manager_pid(manager: Manager(event)) -> process.Pid
Return the Pid of the event manager process.
Useful for monitoring the manager with process.monitor when you did not
start it via start_monitor.
pub fn new_handler(
initial_state: state,
handler: fn(event, state) -> EventStep(state),
) -> Handler(state, event, Nil, Nil)
Handler Builder
Create a handler with an initial state and an event callback.
The on_event function is called for every event delivered to this handler
via notify or sync_notify. It receives the event and the current state,
and must return either Continue(new_state) or Remove.
Example
let handler =
event_manager.new_handler(initial_state: 0, on_event: fn(event, count) {
case event {
Increment -> event_manager.Continue(count + 1)
Reset -> event_manager.Continue(0)
}
})
pub fn new_start_options() -> StartOptions(event)
Build a fresh StartOptions with defaults: no registered name, Infinity
for both timeout and hibernate_after, no debug flags, no spawn options.
pub fn notify(manager: Manager(event), event: event) -> Nil
Asynchronously broadcast an event to all registered handlers.
Returns immediately without waiting for handlers to finish processing.
Use sync_notify if you need a synchronization point.
pub fn on_format_status(
handler: Handler(state, event, request, reply),
formatter: fn(state) -> String,
) -> Handler(state, event, request, reply)
Provide a function to format this handler’s state for OTP status reports.
When set, the returned string is used in place of the raw state in
sys:get_status/1 output and SASL crash reports. Useful for hiding
secrets, summarising large data structures, or presenting a domain-friendly
view. Since OTP 25.0.
Example
event_manager.new_handler(connection, on_event)
|> event_manager.on_format_status(fn(connection) {
"Conn(id=" <> connection.id <> ")"
})
pub fn on_swap_in(
handler: Handler(state, event, request, reply),
hydrate: fn(SwapTerm) -> state,
) -> Handler(state, event, request, reply)
Derive this handler’s initial state from the value produced by the
previous handler’s on_swap_out during a swap_handler operation.
When this callback is unset, the handler’s init_state is used as-is and
any value produced by the previous handler is discarded.
Example
event_manager.new_handler(0, on_event)
|> event_manager.on_swap_in(fn(term) {
case event_manager.swap_term_decode(term, decode.int) {
Ok(n) -> n
Error(_) -> 0
}
})
pub fn on_swap_out(
handler: Handler(state, event, request, reply),
extract: fn(state) -> SwapTerm,
) -> Handler(state, event, request, reply)
Provide a value to hand to the next handler when this one is swapped out
via swap_handler or swap_supervised_handler.
The function receives the current handler state and returns a SwapTerm
(built with swap_term_from) that the next handler’s on_swap_in can
decode. When unset, the next handler’s on_swap_in (if any) is invoked
with a placeholder value.
This callback is invoked instead of on_terminate when the removal
happens through a swap. If you need both cleanup and state transfer in
the swap path, run the cleanup inline within on_swap_out.
Example
event_manager.new_handler(0, on_event)
|> event_manager.on_swap_out(fn(count) { event_manager.swap_term_from(count) })
pub fn on_terminate(
handler: Handler(state, event, request, reply),
cleanup: fn(state) -> Nil,
) -> Handler(state, event, request, reply)
Attach a cleanup function called when the handler is removed or the manager stops.
Example
event_manager.new_handler(connection, on_event)
|> event_manager.on_terminate(fn(connection) { db.close(connection) })
pub fn receive_response(
request_id: RequestId(reply),
timeout: Int,
) -> Result(reply, ReceiveError)
Wait up to timeout milliseconds for the reply to a RequestId.
Returns Ok(reply) on success, Error(ReceiveTimeout) if no reply
arrives in time, or Error(RequestCrashed(reason)) if the manager exited.
pub fn receive_response_collection(
collection: RequestIdCollection(label, reply),
timeout: Int,
handling: ResponseHandling,
) -> CollectionResponse(reply, label)
Wait up to timeout milliseconds for any pending reply in a collection.
Pass Delete to remove the matched request from the returned collection,
or Keep to retain it. Call this in a loop to drain all responses one by
one. Returns CollectionTimeout(remaining) when the timer expires before
any reply arrives, and NoRequests when the collection is empty.
Selectively drains the matched message from the mailbox; non-matching mailbox messages may also be consumed.
Example
let assert event_manager.GotReply(value, label, collection) =
event_manager.receive_response_collection(
collection,
1000,
event_manager.Delete,
)
Requires Erlang/OTP 25.0 or later.
pub fn remove_handler(
manager: Manager(event),
handler_ref: HandlerRef(request, reply),
) -> Result(Nil, RemoveError(request, reply))
Remove a specific handler from the event manager.
The handler’s on_terminate callback is called before removal.
pub fn request_ids_add(
request_id request_id: RequestId(reply),
label label: label,
to collection: RequestIdCollection(label, reply),
) -> RequestIdCollection(label, reply)
Add a RequestId to a collection under a label.
The label is returned alongside the reply in
receive_response_collection, letting you identify which request the
response belongs to.
Requires Erlang/OTP 25.0 or later.
pub fn request_ids_new() -> RequestIdCollection(label, reply)
Create a new, empty request-id collection.
Used with send_request_to_collection to batch multiple async requests
and then drain them through receive_response_collection (or similar).
Requires Erlang/OTP 25.0 or later.
pub fn request_ids_size(
collection: RequestIdCollection(label, reply),
) -> Int
Return the number of pending request IDs in a collection.
Requires Erlang/OTP 25.0 or later.
pub fn request_ids_to_list(
collection: RequestIdCollection(label, reply),
) -> List(#(RequestId(reply), label))
Convert a collection to a list of #(RequestId, label) pairs.
Requires Erlang/OTP 25.0 or later.
pub fn send_request(
manager: Manager(event),
handler_ref: HandlerRef(request, reply),
request: request,
) -> RequestId(reply)
Asynchronously call a specific handler and return a RequestId.
Unlike sync_notify, this targets one handler by its HandlerRef and
expects a reply. The handler must have been registered with
with_call_handler set; otherwise receive_response will return
Error(RequestCrashed(_)).
Use receive_response or wait_response to collect the reply later, or
check_response to poll non-blockingly.
Example
let req: event_manager.RequestId(Int) =
event_manager.send_request(mgr, handler_ref, GetCount)
// ... do other work ...
let assert Ok(count) = event_manager.receive_response(req, 1000)
pub fn send_request_to_collection(
manager: Manager(event),
handler_ref: HandlerRef(request, reply),
request: request,
label: label,
to collection: RequestIdCollection(label, reply),
) -> RequestIdCollection(label, reply)
Send an asynchronous call to a specific handler and immediately add the
resulting RequestId to a collection under the given label.
Equivalent to calling send_request and request_ids_add in one step.
Useful for issuing several requests in a loop before waiting for any of
them.
Example
let collection: event_manager.RequestIdCollection(String, Int) =
event_manager.request_ids_new()
let collection =
event_manager.send_request_to_collection(mgr, h1, GetCount, "h1", collection)
let collection =
event_manager.send_request_to_collection(mgr, h2, GetCount, "h2", collection)
// ... drain via receive_response_collection ...
Requires Erlang/OTP 25.0 or later.
pub fn start(
options: StartOptions(event),
) -> Result(Manager(event), StartError)
Start an event manager process without linking it to the caller.
Maps to gen_event:start/1,2. Useful when the parent does not want
link-based crash propagation, e.g. when the parent will install its own
monitor or hand the manager to a custom supervisor.
pub fn start_link(
options: StartOptions(event),
) -> Result(Manager(event), StartError)
Start an event manager process linked to the caller.
Maps to gen_event:start_link/1,2. Use with_name on the options to
register the manager under a Local, Global, or Via name; otherwise
the manager is started anonymously.
Example
case event_manager.start_link(event_manager.new_start_options()) {
Ok(manager) -> {
// ... use manager ...
event_manager.stop(manager)
}
Error(_) -> Nil
}
Example: registered under a local name
let name = process.new_name("my_event_manager")
let options =
event_manager.new_start_options()
|> event_manager.with_name(event_manager.Local(name))
let assert Ok(manager) = event_manager.start_link(options)
pub fn start_monitor(
options: StartOptions(event),
) -> Result(MonitoredManager(event), StartError)
Start an event manager linked to the caller and atomically return a monitor for it.
Equivalent to calling start_link(options) followed by
process.monitor(manager_pid), but without the race window between the
two calls. The returned MonitoredManager carries both the Manager and
a process.Monitor you can select on. Since OTP 23.0.
Example
case event_manager.start_monitor(event_manager.new_start_options()) {
Ok(monitored) -> {
let selector =
process.new_selector()
|> process.select_specific_monitor(monitored.monitor, fn(down) { down })
// ... use monitored.manager, wait on `selector` for a Down message ...
}
Error(_) -> Nil
}
pub fn stop(manager: Manager(event)) -> Nil
Stop the event manager, terminating it with reason normal.
All registered handlers have their on_terminate callback invoked before
the manager shuts down.
pub fn swap_handler(
manager: Manager(event),
old_handler_ref: HandlerRef(old_request, old_reply),
new_handler: Handler(state, event, request, reply),
) -> Result(HandlerRef(request, reply), SwapError)
Atomically swap an installed handler for a new one.
Maps to gen_event:swap_handler/3. The old handler’s on_swap_out
callback runs (if set), its result is threaded into the new handler’s
on_swap_in (if set) to produce the new handler’s initial state, and the
new handler is installed — all observed by the manager as a single
transaction so no notify can slip between the remove and the add.
On success, returns the new handler’s HandlerRef. The old HandlerRef
becomes invalid.
The new handler is unsupervised. Use swap_supervised_handler to link
the new handler to the calling process.
Example
case event_manager.swap_handler(mgr, old_ref, new_handler) {
Ok(new_ref) -> // use new_ref
Error(_) -> Nil
}
pub fn swap_supervised_handler(
manager: Manager(event),
old_handler_ref: HandlerRef(old_request, old_reply),
new_handler: Handler(state, event, request, reply),
) -> Result(HandlerRef(request, reply), SwapError)
Atomically swap an installed handler for a new supervised handler.
Like swap_handler, but maps to gen_event:swap_sup_handler/3: the new
handler is linked to the calling process. If the new handler is later
removed for any reason other than a normal remove_handler call, the
caller receives a {gen_event_EXIT, HandlerRef, Reason} message.
pub fn swap_term_decode(
term: SwapTerm,
decoder: decode.Decoder(a),
) -> Result(a, List(decode.DecodeError))
Decode the value carried by a SwapTerm using a decode.Decoder,
typically inside on_swap_in.
Example
event_manager.on_swap_in(fn(term) {
case event_manager.swap_term_decode(term, decode.int) {
Ok(n) -> n
Error(_) -> 0
}
})
pub fn swap_term_from(value: anything) -> SwapTerm
Wrap any value as a SwapTerm to hand to the next handler from
on_swap_out.
pub fn sync_notify(manager: Manager(event), event: event) -> Nil
Synchronously broadcast an event to all registered handlers.
Blocks until every currently registered handler has processed the event. Use this when you need to know that all handlers have seen the event before continuing.
pub fn wait_response(
request_id: RequestId(reply),
timeout: Int,
) -> Result(reply, ReceiveError)
Wait up to timeout milliseconds for the reply to a RequestId.
Like receive_response, but maps to gen_event:wait_response/2. The two
differ only in mailbox semantics: wait_response leaves non-matching
messages in place on success, whereas receive_response selectively
drains the matching message (and may leave the receive buffer in a state
that affects later receive clauses).
pub fn wait_response_collection(
collection: RequestIdCollection(label, reply),
timeout: Int,
handling: ResponseHandling,
) -> CollectionResponse(reply, label)
Like receive_response_collection but maps to gen_event:wait_response/3.
On success, leaves non-matching mailbox messages in place rather than
draining them. Use when integrating with a custom process.Selector
that expects untouched non-matching messages.
Requires Erlang/OTP 25.0 or later.
pub fn which_handlers(
manager: Manager(event),
) -> List(HandlerRef(request, reply))
Return the list of HandlerRefs for all currently registered handlers.
pub fn with_call_handler(
handler: Handler(state, event, request, reply),
on_call: fn(request, state) -> #(reply, state),
) -> Handler(state, event, request, reply)
Attach a call handler to a handler, enabling async request/response via
send_request. The on_call function receives the request and the current
handler state, and must return a #(reply, new_state) tuple.
Without this, send_request calls to this handler will fail with
Error(RequestCrashed(_)).
Example
event_manager.new_handler(0, on_event)
|> event_manager.with_call_handler(fn(GetCount, count) { #(count, count) })
pub fn with_debug(
options: StartOptions(event),
flags: List(start_options.DebugFlag),
) -> StartOptions(event)
Set the sys debug flags for the manager.
pub fn with_hibernate_after(
options: StartOptions(event),
timeout: start_options.Timeout,
) -> StartOptions(event)
Set the idle hibernation timeout. Passed through to gen_event as the
{hibernate_after, _} option.
pub fn with_name(
options: StartOptions(event),
name: ServerName(event),
) -> StartOptions(event)
Register the manager under a name when it starts.
Pass Local(name) for the common local-atom registration (the only form
compatible with a process.Subject), Global(name) for cluster-wide
registration via the global module, or Via(module, term) for a
custom registry such as gproc or syn.
pub fn with_spawn_options(
options: StartOptions(event),
spawn_options: List(start_options.SpawnOption),
) -> StartOptions(event)
Set the erlang:spawn_opt/2 options forwarded to the manager process.
pub fn with_timeout(
options: StartOptions(event),
timeout: start_options.Timeout,
) -> StartOptions(event)
Set the initialisation timeout. Passed through to gen_event as the
{timeout, _} option.