Workflow / task orchestration diagram builder on top of Yog.
Choreo.Workflow models automated task orchestration where nodes are
process steps and edges are execution dependencies. It supports:
- Tasks — automated steps with timeout and retry config
- Decisions — conditional branching
- Fork / Join — parallel execution paths
- Compensations — Saga-pattern rollback handlers
- Events — triggers, timers, signals
- Swimlanes — group tasks by team, service, or domain
When to use
Use Choreo.Workflow when designing distributed business processes,
Saga transactions, CI/CD pipelines, or approval flows. It identifies
the critical path, finds parallelizable tasks, and verifies that every
failure scenario has a compensation route.
Further reading
Quick Start
workflow =
Choreo.Workflow.new()
|> Choreo.Workflow.add_start(:order_received)
|> Choreo.Workflow.add_task(:charge_card, timeout_ms: 5000, retry: 3)
|> Choreo.Workflow.add_task(:reserve_inventory, timeout_ms: 3000)
|> Choreo.Workflow.add_decision(:sufficient_stock)
|> Choreo.Workflow.add_task(:pack_items, timeout_ms: 10_000)
|> Choreo.Workflow.add_task(:ship_order, timeout_ms: 5000)
|> Choreo.Workflow.add_compensation(:refund_payment, for: :charge_card)
|> Choreo.Workflow.add_end(:done)
|> Choreo.Workflow.connect(:order_received, :charge_card)
|> Choreo.Workflow.connect(:charge_card, :reserve_inventory)
|> Choreo.Workflow.connect(:reserve_inventory, :sufficient_stock)
|> Choreo.Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
|> Choreo.Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
|> Choreo.Workflow.connect(:pack_items, :ship_order)
|> Choreo.Workflow.connect(:ship_order, :done)
dot = Choreo.Workflow.to_dot(workflow)Diagram
Analysis
# Longest-latency path through the workflow
{:ok, path, latency} = Choreo.Workflow.Analysis.critical_path(workflow)
# Tasks that can run in parallel
Choreo.Workflow.Analysis.parallelizable_tasks(workflow)
# Tasks with missing compensations
Choreo.Workflow.Analysis.missing_compensations(workflow)
# Validation
Choreo.Workflow.Analysis.validate(workflow)
Summary
Functions
Adds a compensation / rollback node (Saga pattern).
Adds a decision / gateway node for conditional branching.
Adds an end node (terminal).
Adds an event node (trigger, timer, signal).
Adds a fork node that splits execution into parallel paths.
Adds a join node that merges parallel paths.
Adds a start node (entry point).
Adds a swimlane grouping.
Adds an automated task node.
Returns all compensation node IDs.
Connects two workflow nodes with an execution dependency.
Returns all edges as {from, to, weight} tuples.
Returns all end node IDs.
Creates a new empty workflow graph.
Returns all node IDs in the workflow.
Returns all start node IDs.
Returns all task node IDs.
Renders the workflow to DOT format.
Returns the raw Yog.Graph struct underpinning the workflow.
Types
@type t() :: %Choreo.Workflow{ clusters: %{required(String.t()) => map()}, edge_meta: %{optional({Yog.node_id(), Yog.node_id()}) => map()}, graph: Yog.graph() }
Functions
@spec add_compensation(t(), Yog.node_id(), keyword()) :: t()
Adds a compensation / rollback node (Saga pattern).
Options
:for— the task id this compensation rolls back:label— display label:handler— handler name / reference:description— tooltip text
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_compensation(workflow, :rollback, for: :process)
iex> Choreo.Workflow.compensations(workflow)
[:rollback]
iex> Yog.node(workflow.graph, :rollback).target_task
:processDiagram
@spec add_decision(t(), Yog.node_id(), keyword()) :: t()
Adds a decision / gateway node for conditional branching.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_decision(workflow, :check)
iex> Yog.node(workflow.graph, :check).node_type
:decisionDiagram
@spec add_end(t(), Yog.node_id(), keyword()) :: t()
Adds an end node (terminal).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_end(workflow, :finish, label: "End")
iex> Choreo.Workflow.ends(workflow)
[:finish]
iex> Yog.node(workflow.graph, :finish).node_type
:endDiagram
@spec add_event(t(), Yog.node_id(), keyword()) :: t()
Adds an event node (trigger, timer, signal).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_event(workflow, :timer)
iex> Yog.node(workflow.graph, :timer).node_type
:event
@spec add_fork(t(), Yog.node_id(), keyword()) :: t()
Adds a fork node that splits execution into parallel paths.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_fork(workflow, :split)
iex> Yog.node(workflow.graph, :split).node_type
:fork
@spec add_join(t(), Yog.node_id(), keyword()) :: t()
Adds a join node that merges parallel paths.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_join(workflow, :merge)
iex> Yog.node(workflow.graph, :merge).node_type
:join
@spec add_start(t(), Yog.node_id(), keyword()) :: t()
Adds a start node (entry point).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_start(workflow, :begin, label: "Start")
iex> Choreo.Workflow.starts(workflow)
[:begin]
iex> Yog.node(workflow.graph, :begin).node_type
:startDiagram
Adds a swimlane grouping.
Swimlanes are rendered as subgraph clusters. Nodes can be assigned to a
swimlane via the :swimlane option in node builders.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_swimlane("backend", label: "Backend Services")
...> |> Choreo.Workflow.add_task(:api, swimlane: "backend")
iex> Yog.node(workflow.graph, :api)[:cluster]
"cluster_backend"
@spec add_task(t(), Yog.node_id(), keyword()) :: t()
Adds an automated task node.
Options
:timeout_ms— maximum time allowed for the task (default:5000):retry— number of retry attempts on failure (default:0):retry_backoff_ms— backoff between retries in milliseconds:label— display label:handler— handler name / reference:description— tooltip text:swimlane— swimlane group name
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_task(workflow, :process, timeout_ms: 5000, retry: 3)
iex> Choreo.Workflow.tasks(workflow)
[:process]
iex> Yog.node(workflow.graph, :process).timeout_ms
5000
iex> Yog.node(workflow.graph, :process).retry
3Diagram
@spec compensations(t()) :: [Yog.node_id()]
Returns all compensation node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_compensation(:rollback, for: :task_a)
...> |> Choreo.Workflow.add_compensation(:undo, for: :task_b)
iex> Enum.sort(Choreo.Workflow.compensations(workflow))
[:rollback, :undo]
@spec connect(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Connects two workflow nodes with an execution dependency.
Options
:condition— branch condition label (shown on decision edges):edge_type—:sequence(default),:compensation,:retry,:failure,:timeout:weight— edge weight for path calculations (defaults to target task timeout_ms):label— override edge label
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
[{:a, :b, 1}]
iex> workflow.edge_meta[{:a, :b}].edge_type
:sequenceDiagram
@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]
Returns all edges as {from, to, weight} tuples.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
[{:a, :b, 1}]
@spec ends(t()) :: [Yog.node_id()]
Returns all end node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_end(:a)
...> |> Choreo.Workflow.add_end(:b)
iex> Enum.sort(Choreo.Workflow.ends(workflow))
[:a, :b]
@spec new() :: t()
Creates a new empty workflow graph.
Workflow graphs are always directed.
Examples
iex> workflow = Choreo.Workflow.new()
iex> Choreo.Workflow.nodes(workflow)
[]
iex> Choreo.Workflow.starts(workflow)
[]
iex> Choreo.Workflow.ends(workflow)
[]
@spec nodes(t()) :: [Yog.node_id()]
Returns all node IDs in the workflow.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
iex> Enum.sort(Choreo.Workflow.nodes(workflow))
[:a, :b]
@spec starts(t()) :: [Yog.node_id()]
Returns all start node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_start(:b)
iex> Enum.sort(Choreo.Workflow.starts(workflow))
[:a, :b]
@spec tasks(t()) :: [Yog.node_id()]
Returns all task node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_start(:s)
iex> Enum.sort(Choreo.Workflow.tasks(workflow))
[:a, :b]
Renders the workflow to DOT format.
Options
:theme—:default,:dark, or aChoreo.Themestruct
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:start)
...> |> Choreo.Workflow.add_task(:process)
...> |> Choreo.Workflow.add_end(:end)
...> |> Choreo.Workflow.connect(:start, :process)
...> |> Choreo.Workflow.connect(:process, :end)
iex> dot = Choreo.Workflow.to_dot(workflow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "process")
true
Returns the raw Yog.Graph struct underpinning the workflow.
Examples
iex> workflow = Choreo.Workflow.new()
iex> graph = Choreo.Workflow.to_graph(workflow)
iex> graph.kind
:directed