Baton.Events (Baton v0.1.0)

Copy Markdown View Source

PubSub broadcasting for workflow step lifecycle events.

Workers broadcast on two topic levels so LiveViews can subscribe at whatever granularity they need:

  • "workflow:all" — every event from every workflow (for the index view)
  • "workflow:{workflow_id}" — events for one specific workflow (for the detail view)

Event shapes

# Step changed state
{:workflow_step_updated, %{
  workflow_id: "uuid",
  workflow_label: "my-etl",
  step_name: "fetch_token",
  worker: "MyApp.Workers.FetchToken",
  state: "completed",         # or "executing", "cancelled", "discarded"
  job_id: 123,
  attempt: 1,
  has_result: true,
  error: nil,                 # or error string on failure
  timestamp: ~U[...]
}}

# Whole workflow reached terminal state
{:workflow_finished, %{
  workflow_id: "uuid",
  workflow_label: "my-etl",
  outcome: :completed,        # or :failed
  failed_steps: ["step_b"],
  timestamp: ~U[...]
}}

Summary

Functions

Broadcast that a step was cancelled (cascaded from a failed dep).

Broadcast that a step completed successfully.

Broadcast that a step was permanently discarded.

Broadcast that a step failed (will retry).

Broadcast that a step was skipped (dep not satisfied — snoozing).

Broadcast that a step has started executing.

Broadcast that a whole workflow has reached a terminal state.

Subscribe to events for all workflows. Use in the index LiveView.

Subscribe to events for one specific workflow. Use in the detail LiveView.

Unsubscribe from a specific workflow's topic.

Functions

broadcast_step_cancelled(job, reason)

Broadcast that a step was cancelled (cascaded from a failed dep).

broadcast_step_completed(job, has_result \\ false)

Broadcast that a step completed successfully.

broadcast_step_discarded(job, error)

Broadcast that a step was permanently discarded.

broadcast_step_retrying(job, error)

Broadcast that a step failed (will retry).

broadcast_step_snoozed(job)

Broadcast that a step was skipped (dep not satisfied — snoozing).

broadcast_step_started(job)

Broadcast that a step has started executing.

broadcast_workflow_finished(workflow_id, label, outcome, failed_steps, opts \\ [])

@spec broadcast_workflow_finished(
  String.t(),
  String.t() | nil,
  :completed | :failed,
  [String.t()],
  keyword()
) :: :ok

Broadcast that a whole workflow has reached a terminal state.

Sent once by Baton.Completion when the last outstanding step settles. outcome is :completed (every step succeeded) or :failed (at least one step was cancelled or discarded); failed_steps lists the offending step names. Also emits [:baton, :workflow, :finished] telemetry.

subscribe_all()

Subscribe to events for all workflows. Use in the index LiveView.

subscribe_workflow(workflow_id)

Subscribe to events for one specific workflow. Use in the detail LiveView.

topic_all()

topic_workflow(workflow_id)

unsubscribe_workflow(workflow_id)

Unsubscribe from a specific workflow's topic.