Cyclium.Episodes (Cyclium v0.1.8)

Copy Markdown View Source

Context for episode CRUD operations and lifecycle management.

Summary

Functions

Execute the cancellation sequence for an episode (spec §4.9).

Cancel running and blocked episodes for the given actor and expectation.

Attempt to claim a stale episode for recovery via optimistic update.

Count episodes for the given actor ID(s). Accepts same filter opts as list_by_actors.

Force-fires an episode for an actor/expectation pair.

Return the started_at of the most recent schedule-triggered episode for the given actor and expectation. Returns nil if none found.

List episodes for the given actor ID(s).

List episodes for the given actor(s) where the trigger payload or workflow input contains the given subject key/value pair.

List episodes for a conversation, ordered by start time.

List :running episodes with no recent step journal activity.

Returns the next step number for an episode's journal (current max + 1, or 1 when there are no steps yet).

Resolve an approval on a blocked interactive episode. Verifies the plan_hash matches the approval_requested step.

Functions

cancel(episode_id, reason \\ "manual")

Execute the cancellation sequence for an episode (spec §4.9).

  1. Journal :episode_failed step with error_class "canceled"
  2. Set status to :canceled
  3. Cancel pending outputs (:proposed → :canceled)
  4. Publish "episode.canceled" Bus event
  5. Emit [:cyclium, :episode, :canceled] telemetry

cancel_related(actor_id, expectation_id, reason \\ "cascade_cancel")

Cancel running and blocked episodes for the given actor and expectation.

Returns {:ok, count} with the number of episodes canceled.

claim_for_recovery(episode_id)

Attempt to claim a stale episode for recovery via optimistic update.

Sets phase to "recovering" only if the episode is still :running. Returns {:ok, episode} if claimed, {:error, :already_claimed} if another node got there first.

count_by_actors(actor_ids, opts \\ [])

Count episodes for the given actor ID(s). Accepts same filter opts as list_by_actors.

count_steps(episode_id)

create(attrs)

force_fire(actor_id, expectation_id, opts \\ [])

Force-fires an episode for an actor/expectation pair.

Options

  • :mode:live (default) or :dry_run
  • :trigger_payload — map of trigger data
  • :overrides — dry run overrides map with optional keys:
    • "tool_overrides"%{"capability.action" => mock_result}
    • "synthesis_override" — mock synthesis result

Returns {:ok, episode} or {:error, reason}.

get(id)

get!(id)

get_log(episode_id)

last_schedule_fire(actor_id, expectation_id)

Return the started_at of the most recent schedule-triggered episode for the given actor and expectation. Returns nil if none found.

Used by schedule timers on actor init to compute correct delay after restarts (surviving missed windows).

list_by_actors(actor_ids, opts \\ [])

List episodes for the given actor ID(s).

Options

  • :statuses — list of status atoms to filter by (default: all)
  • :limit — max rows to return (default: 50)
  • :offset — rows to skip (default: 0)
  • :order:asc or :desc by started_at (default: :desc)
  • :exclude_archived — when true, excludes episodes with a non-nil archived_at (default: false)

list_by_actors_and_subject(actor_ids, subject_key, subject_value, opts \\ [])

List episodes for the given actor(s) where the trigger payload or workflow input contains the given subject key/value pair.

Queries two JSON paths to cover both trigger types:

  • Event-triggered: trigger_ref.payload.<key>
  • Workflow-triggered: trigger_ref.input.<key>

Detects the repo adapter at runtime and uses the appropriate JSON text extraction — Postgres #>> or SQL Server JSON_VALUE.

Options

Same as list_by_actors/2.

list_by_status(statuses)

list_for_conversation(conversation_id, opts \\ [])

List episodes for a conversation, ordered by start time.

list_stale_running(stale_after_ms, opts \\ [])

List :running episodes with no recent step journal activity.

An episode is considered stale if:

  • Its most recent EpisodeStep.created_at is older than stale_after_ms, OR
  • It has no steps and its started_at is older than the threshold

Options

  • :source_stack — restrict the scan to episodes created by this stack. Pre-migration rows with NULL source_stack are included (treated as "match any stack") to avoid orphaning legacy episodes. When omitted, the scan is unscoped.
  • :source_env — restrict the scan to episodes created by this env, matched by strict equality (so an env-tagged node won't recover the default node's work and vice-versa). Pass nil to scope to the unset/default env (includes legacy NULL rows). When omitted, env is not filtered.

Used by Cyclium.Recovery.sweep/1 to find orphaned episodes after deploys.

list_steps(episode_id, opts \\ [])

next_step_no(episode_id)

@spec next_step_no(binary()) :: pos_integer()

Returns the next step number for an episode's journal (current max + 1, or 1 when there are no steps yet).

Shared by the episode runner and the output router so step numbering stays consistent across both. Steps for a single episode are journaled serially by one process, so the read-then-increment is race-free in practice.

resolve_approval(episode_id, plan_hash, approved? \\ true)

Resolve an approval on a blocked interactive episode. Verifies the plan_hash matches the approval_requested step.

update_status(episode_id, status)

update_status(episode_id, status, extra_fields)