Workflow / task orchestration diagram builder on top of Yog.
Choreo.Workflow models automated task orchestration where nodes are
process steps and edges are execution dependencies. It supports:
- Tasks — automated steps with timeout and retry config
- Decisions — conditional branching
- Fork / Join — parallel execution paths
- Compensations — Saga-pattern rollback handlers
- Events — triggers, timers, signals
- Swimlanes — group tasks by team, service, or domain
When to use
Use Choreo.Workflow when designing distributed business processes,
Saga transactions, CI/CD pipelines, or approval flows. It identifies
the critical path, finds parallelizable tasks, and verifies that every
failure scenario has a compensation route.
Further reading
Quick Start
workflow =
Choreo.Workflow.new()
|> Choreo.Workflow.add_start(:order_received)
|> Choreo.Workflow.add_task(:charge_card, timeout_ms: 5000, retry: 3)
|> Choreo.Workflow.add_task(:reserve_inventory, timeout_ms: 3000)
|> Choreo.Workflow.add_decision(:sufficient_stock)
|> Choreo.Workflow.add_task(:pack_items, timeout_ms: 10_000)
|> Choreo.Workflow.add_task(:ship_order, timeout_ms: 5000)
|> Choreo.Workflow.add_compensation(:refund_payment, for: :charge_card)
|> Choreo.Workflow.add_end(:done)
|> Choreo.Workflow.connect(:order_received, :charge_card)
|> Choreo.Workflow.connect(:charge_card, :reserve_inventory)
|> Choreo.Workflow.connect(:reserve_inventory, :sufficient_stock)
|> Choreo.Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
|> Choreo.Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
|> Choreo.Workflow.connect(:pack_items, :ship_order)
|> Choreo.Workflow.connect(:ship_order, :done)
dot = Choreo.Workflow.to_dot(workflow)Diagram
Analysis
# Longest-latency path through the workflow
{:ok, path, latency} = Choreo.Workflow.Analysis.critical_path(workflow)
# Tasks that can run in parallel
Choreo.Workflow.Analysis.parallelizable_tasks(workflow)
# Tasks with missing compensations
Choreo.Workflow.Analysis.missing_compensations(workflow)
# Validation
Choreo.Workflow.Analysis.validate(workflow)
Summary
Functions
Adds a compensation / rollback node (Saga pattern).
Adds a decision / gateway node for conditional branching.
Adds an end node (terminal).
Adds an event node (trigger, timer, signal).
Adds a fork node that splits execution into parallel paths.
Adds a join node that merges parallel paths.
Adds a start node (entry point).
Adds a swimlane grouping.
Adds an automated task node.
Returns all compensation node IDs.
Connects two workflow nodes with an execution dependency.
Returns all edges as {from, to, weight} tuples.
Returns all edges with their metadata as {from, to, weight, meta} tuples.
Returns all end node IDs.
Creates a new empty workflow graph.
Returns all node IDs in the workflow.
Returns all start node IDs.
Returns all task node IDs.
Renders the workflow to DOT format.
Returns the raw Yog.Graph struct underpinning the workflow.
Collapses parallel edges into a simple Graph for algorithm analysis.
Types
@type t() :: %Choreo.Workflow{ clusters: %{required(String.t()) => map()}, edge_meta: %{optional(Yog.Multi.Graph.edge_id()) => map()}, graph: Yog.Multi.Graph.t() }
Functions
@spec add_compensation(t(), Yog.node_id(), keyword()) :: t()
Adds a compensation / rollback node (Saga pattern).
Options
:for:label(String.t/0):handler:description(String.t/0):swimlane(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_compensation(workflow, :rollback, for: :process)
iex> Choreo.Workflow.compensations(workflow)
[:rollback]
iex> Map.get(workflow.graph.nodes, :rollback).target_task
:processDiagram
Adds a decision / gateway node for conditional branching.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_decision(workflow, :check)
iex> Map.get(workflow.graph.nodes, :check).node_type
:decisionDiagram
@spec add_end(t(), Yog.node_id(), keyword()) :: t()
Adds an end node (terminal).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_end(workflow, :finish, label: "End")
iex> Choreo.Workflow.ends(workflow)
[:finish]
iex> Map.get(workflow.graph.nodes, :finish).node_type
:endDiagram
Adds an event node (trigger, timer, signal).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_event(workflow, :timer)
iex> Map.get(workflow.graph.nodes, :timer).node_type
:event
@spec add_fork(t(), Yog.node_id(), keyword()) :: t()
Adds a fork node that splits execution into parallel paths.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_fork(workflow, :split)
iex> Map.get(workflow.graph.nodes, :split).node_type
:fork
Adds a join node that merges parallel paths.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_join(workflow, :merge)
iex> Map.get(workflow.graph.nodes, :merge).node_type
:join
@spec add_start(t(), Yog.node_id(), keyword()) :: t()
Adds a start node (entry point).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_start(workflow, :begin, label: "Start")
iex> Choreo.Workflow.starts(workflow)
[:begin]
iex> Map.get(workflow.graph.nodes, :begin).node_type
:startDiagram
Adds a swimlane grouping.
Swimlanes are rendered as subgraph clusters. Nodes can be assigned to a
swimlane via the :swimlane option in node builders.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_swimlane("backend", label: "Backend Services")
...> |> Choreo.Workflow.add_task(:api, swimlane: "backend")
iex> Map.get(workflow.graph.nodes, :api)[:cluster]
"cluster_backend"
Adds an automated task node.
Options
:timeout_ms(integer/0):retry(integer/0):retry_backoff_ms(integer/0):handler:label(String.t/0):description(String.t/0):swimlane(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_task(workflow, :process, timeout_ms: 5000, retry: 3)
iex> Choreo.Workflow.tasks(workflow)
[:process]
iex> Map.get(workflow.graph.nodes, :process).timeout_ms
5000
iex> Map.get(workflow.graph.nodes, :process).retry
3Diagram
@spec compensations(t()) :: [Yog.node_id()]
Returns all compensation node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_compensation(:rollback, for: :task_a)
...> |> Choreo.Workflow.add_compensation(:undo, for: :task_b)
iex> Enum.sort(Choreo.Workflow.compensations(workflow))
[:rollback, :undo]
Connects two workflow nodes with an execution dependency.
Multiple connections are allowed per (from, to) pair (parallel edges).
Options
:condition(String.t/0):edge_type:weight:label(String.t/0)
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
iex> [{:a, :b, _, meta}] = Choreo.Workflow.edges_with_meta(workflow)
iex> meta.edge_type
:sequenceDiagram
@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]
Returns all edges as {from, to, weight} tuples.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
[{:a, :b, 1}]
@spec edges_with_meta(t()) :: [{Yog.node_id(), Yog.node_id(), number(), map()}]
Returns all edges with their metadata as {from, to, weight, meta} tuples.
@spec ends(t()) :: [Yog.node_id()]
Returns all end node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_end(:a)
...> |> Choreo.Workflow.add_end(:b)
iex> Enum.sort(Choreo.Workflow.ends(workflow))
[:a, :b]
@spec new() :: t()
Creates a new empty workflow graph.
Workflow graphs are always directed.
Examples
iex> workflow = Choreo.Workflow.new()
iex> Choreo.Workflow.nodes(workflow)
[]
iex> Choreo.Workflow.starts(workflow)
[]
iex> Choreo.Workflow.ends(workflow)
[]
@spec nodes(t()) :: [Yog.node_id()]
Returns all node IDs in the workflow.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
iex> Enum.sort(Choreo.Workflow.nodes(workflow))
[:a, :b]
@spec starts(t()) :: [Yog.node_id()]
Returns all start node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_start(:b)
iex> Enum.sort(Choreo.Workflow.starts(workflow))
[:a, :b]
@spec tasks(t()) :: [Yog.node_id()]
Returns all task node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_start(:s)
iex> Enum.sort(Choreo.Workflow.tasks(workflow))
[:a, :b]
Renders the workflow to DOT format.
Options
:theme—:default,:dark, or aChoreo.Themestruct
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:start)
...> |> Choreo.Workflow.add_task(:process)
...> |> Choreo.Workflow.add_end(:end)
...> |> Choreo.Workflow.connect(:start, :process)
...> |> Choreo.Workflow.connect(:process, :end)
iex> dot = Choreo.Workflow.to_dot(workflow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "process")
true
Returns the raw Yog.Graph struct underpinning the workflow.
Examples
iex> workflow = Choreo.Workflow.new()
iex> graph = Choreo.Workflow.to_graph(workflow)
iex> graph.kind
:directed
@spec to_simple_graph( t(), keyword() ) :: Yog.Graph.t()
Collapses parallel edges into a simple Graph for algorithm analysis.