CouncilEx.Recorder behaviour (CouncilEx v0.1.0)

Copy Markdown View Source

Behaviour for run lifecycle event recorders.

A recorder is a single-writer process spawned per run, before the CouncilEx.RunServer starts, that subscribes to the run's PubSub topic and reacts to lifecycle events with side effects:

  • persist to a database (e.g. CouncilEx.Recorder.Ecto)
  • ship to Kafka / NATS / S3
  • emit OpenTelemetry spans
  • append to an offline eval harness

Lifecycle

The runtime guarantees:

  1. init/2 is called with the assigned run_id and user args. The recorder process is fully subscribed to the run topic before init/2 returns, so no :run_started event is ever missed.
  2. handle_event/2 is called for each PubSub event broadcast by the RunServer (see CouncilEx.Events for the catalog and payload shapes), in broadcast order.
  3. handle_finalize/2 is called exactly once with the terminal outcome:
    • {:ok, %CouncilEx.Result{}} on :run_completed
    • :cancelled when the run was terminated via CouncilEx.cancel/2 (or :terminate_run/2)
    • {:error, term()} for any other failure path After handle_finalize/2 returns, the recorder process exits normally.

Recorders are spawned unsupervised (GenServer.start/3 semantics). A recorder crash does not kill the run — the run continues without a recorder. If the RunServer crashes without broadcasting a terminal event, the recorder is notified via Process.monitor/1 and handle_finalize/2 is invoked with {:error, {:runserver_crashed, reason}}.

Wiring

CouncilEx.start(council, input,
  recorder: {MyApp.EctoRecorder, %{repo: MyApp.Repo}}
)

See also CouncilEx.Recorder.Ecto for the default Ecto-backed implementation.

Summary

Types

Terminal outcome passed to handle_finalize/2.

Callbacks

Handle a single run lifecycle event.

Handle the terminal outcome.

Initialise recorder state.

Types

outcome()

@type outcome() :: {:ok, CouncilEx.Result.t()} | {:error, term()} | :cancelled

Terminal outcome passed to handle_finalize/2.

state()

@type state() :: term()

Callbacks

handle_event(event, state)

@callback handle_event(event :: tuple(), state()) :: {:ok, state()}

Handle a single run lifecycle event.

event is the raw tuple as broadcast by the RunServer — see CouncilEx.Events for the catalog. Implementations may pattern match on event tags (:run_started, :round_completed, :member_completed, etc.) and persist a subset.

Must return {:ok, state}. The recorder is unsupervised; a crash exits the recorder process without affecting the run.

handle_finalize(outcome, state)

@callback handle_finalize(outcome :: outcome(), state()) :: :ok

Handle the terminal outcome.

Called exactly once after the run reaches a terminal state. The recorder process exits after this returns.

init(run_id, args)

@callback init(run_id :: String.t(), args :: term()) :: {:ok, state()} | {:stop, term()}

Initialise recorder state.

Called once, in the recorder process, before it begins consuming events. Receives the run_id chosen by the runtime (or supplied via the :run_id option to CouncilEx.start/3) and the user-supplied args from the {module, args} tuple passed in :recorder.

Return {:ok, state} to begin consuming events. Return {:stop, reason} to abort recorder startup; the run will still proceed without a recorder.