Workflow / task orchestration diagram builder on top of Yog.
Choreo.Workflow models automated task orchestration where nodes are
process steps and edges are execution dependencies. It supports:
- Tasks — automated steps with timeout and retry config
- Decisions — conditional branching
- Fork / Join — parallel execution paths
- Compensations — Saga-pattern rollback handlers
- Events — triggers, timers, signals
- Swimlanes — group tasks by team, service, or domain
When to use
Use Choreo.Workflow when designing distributed business processes,
Saga transactions, CI/CD pipelines, or approval flows. It identifies
the critical path, finds parallelizable tasks, and verifies that every
failure scenario has a compensation route.
Further reading
Quick Start
workflow =
Choreo.Workflow.new()
|> Choreo.Workflow.add_start(:order_received)
|> Choreo.Workflow.add_task(:charge_card, timeout_ms: 5000, retry: 3)
|> Choreo.Workflow.add_task(:reserve_inventory, timeout_ms: 3000)
|> Choreo.Workflow.add_decision(:sufficient_stock)
|> Choreo.Workflow.add_task(:pack_items, timeout_ms: 10_000)
|> Choreo.Workflow.add_task(:ship_order, timeout_ms: 5000)
|> Choreo.Workflow.add_compensation(:refund_payment, for: :charge_card)
|> Choreo.Workflow.add_end(:done)
|> Choreo.Workflow.connect(:order_received, :charge_card)
|> Choreo.Workflow.connect(:charge_card, :reserve_inventory)
|> Choreo.Workflow.connect(:reserve_inventory, :sufficient_stock)
|> Choreo.Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
|> Choreo.Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
|> Choreo.Workflow.connect(:pack_items, :ship_order)
|> Choreo.Workflow.connect(:ship_order, :done)
dot = Choreo.Workflow.to_dot(workflow)Diagram
Analysis
# Longest-latency path through the workflow
{:ok, path, latency} = Choreo.Workflow.Analysis.critical_path(workflow)
# Tasks that can run in parallel
Choreo.Workflow.Analysis.parallelizable_tasks(workflow)
# Tasks with missing compensations
Choreo.Workflow.Analysis.missing_compensations(workflow)
# Validation
Choreo.Workflow.Analysis.validate(workflow)
Summary
Functions
Adds a compensation / rollback node (Saga pattern).
Adds a decision / gateway node for conditional branching.
Adds an end node (terminal).
Adds an event node (trigger, timer, signal).
Adds a fork node that splits execution into parallel paths.
Adds a join node that merges parallel paths.
Adds a start node (entry point).
Adds a swimlane grouping.
Adds an automated task node.
Returns all compensation node IDs.
Connects two workflow nodes with an execution dependency.
Returns all edges as {from, to, weight} tuples.
Returns all edges with their metadata as {from, to, weight, meta} tuples.
Returns all end node IDs.
Creates a new empty workflow graph.
Returns all node IDs in the workflow.
Returns all start node IDs.
Returns all task node IDs.
Returns a theme for Choreo.Workflow.
Renders the workflow to DOT format.
Returns the raw Yog.Graph struct underpinning the workflow.
Collapses parallel edges into a simple Graph for algorithm analysis.
Types
@type t() :: %Choreo.Workflow{ clusters: %{required(String.t()) => map()}, edge_meta: %{optional(Yog.Multi.Graph.edge_id()) => map()}, graph: Yog.Multi.Graph.t() }
Functions
@spec add_compensation(t(), Yog.node_id(), keyword()) :: t()
Adds a compensation / rollback node (Saga pattern).
Options
:for:label(String.t/0):handler:description(String.t/0):swimlane:shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_compensation(workflow, :rollback, for: :process)
iex> Choreo.Workflow.compensations(workflow)
[:rollback]
iex> Map.get(workflow.graph.nodes, :rollback).target_task
:processDiagram
Adds a decision / gateway node for conditional branching.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_decision(workflow, :check)
iex> Map.get(workflow.graph.nodes, :check).node_type
:decisionDiagram
@spec add_end(t(), Yog.node_id(), keyword()) :: t()
Adds an end node (terminal).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_end(workflow, :finish, label: "End")
iex> Choreo.Workflow.ends(workflow)
[:finish]
iex> Map.get(workflow.graph.nodes, :finish).node_type
:endDiagram
Adds an event node (trigger, timer, signal).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_event(workflow, :timer)
iex> Map.get(workflow.graph.nodes, :timer).node_type
:event
@spec add_fork(t(), Yog.node_id(), keyword()) :: t()
Adds a fork node that splits execution into parallel paths.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_fork(workflow, :split)
iex> Map.get(workflow.graph.nodes, :split).node_type
:fork
Adds a join node that merges parallel paths.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_join(workflow, :merge)
iex> Map.get(workflow.graph.nodes, :merge).node_type
:join
@spec add_start(t(), Yog.node_id(), keyword()) :: t()
Adds a start node (entry point).
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_start(workflow, :begin, label: "Start")
iex> Choreo.Workflow.starts(workflow)
[:begin]
iex> Map.get(workflow.graph.nodes, :begin).node_type
:startDiagram
Adds a swimlane grouping.
Swimlanes are rendered as subgraph clusters. Nodes can be assigned to a
swimlane via the :swimlane option in node builders.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_swimlane("backend", label: "Backend Services")
...> |> Choreo.Workflow.add_task(:api, swimlane: "backend")
iex> Map.get(workflow.graph.nodes, :api)[:cluster]
"cluster_backend"
Adds an automated task node.
Options
:timeout_ms(integer/0):retry(integer/0):retry_backoff_ms(integer/0):handler:label(String.t/0):description(String.t/0):swimlane:shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_task(workflow, :process, timeout_ms: 5000, retry: 3)
iex> Choreo.Workflow.tasks(workflow)
[:process]
iex> Map.get(workflow.graph.nodes, :process).timeout_ms
5000
iex> Map.get(workflow.graph.nodes, :process).retry
3Diagram
@spec compensations(t()) :: [Yog.node_id()]
Returns all compensation node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_compensation(:rollback, for: :task_a)
...> |> Choreo.Workflow.add_compensation(:undo, for: :task_b)
iex> Enum.sort(Choreo.Workflow.compensations(workflow))
[:rollback, :undo]
Connects two workflow nodes with an execution dependency.
Multiple connections are allowed per (from, to) pair (parallel edges).
Options
:condition(String.t/0):edge_type:weight:label(String.t/0)
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
iex> [{:a, :b, _, meta}] = Choreo.Workflow.edges_with_meta(workflow)
iex> meta.edge_type
:sequenceDiagram
@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]
Returns all edges as {from, to, weight} tuples.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
[{:a, :b, 1}]
@spec edges_with_meta(t()) :: [{Yog.node_id(), Yog.node_id(), number(), map()}]
Returns all edges with their metadata as {from, to, weight, meta} tuples.
@spec ends(t()) :: [Yog.node_id()]
Returns all end node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_end(:a)
...> |> Choreo.Workflow.add_end(:b)
iex> Enum.sort(Choreo.Workflow.ends(workflow))
[:a, :b]
@spec new() :: t()
Creates a new empty workflow graph.
Workflow graphs are always directed.
Examples
iex> workflow = Choreo.Workflow.new()
iex> Choreo.Workflow.nodes(workflow)
[]
iex> Choreo.Workflow.starts(workflow)
[]
iex> Choreo.Workflow.ends(workflow)
[]
@spec nodes(t()) :: [Yog.node_id()]
Returns all node IDs in the workflow.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
iex> Enum.sort(Choreo.Workflow.nodes(workflow))
[:a, :b]
@spec starts(t()) :: [Yog.node_id()]
Returns all start node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_start(:b)
iex> Enum.sort(Choreo.Workflow.starts(workflow))
[:a, :b]
@spec tasks(t()) :: [Yog.node_id()]
Returns all task node IDs.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_start(:s)
iex> Enum.sort(Choreo.Workflow.tasks(workflow))
[:a, :b]
@spec theme( atom(), keyword() ) :: Choreo.Theme.t()
Returns a theme for Choreo.Workflow.
Examples
iex> theme = Choreo.Workflow.theme(:default, graph_rankdir: :lr)
iex> theme.graph_rankdir
:lr
Renders the workflow to DOT format.
Options
:theme—:default,:dark, or aChoreo.Themestruct
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:start)
...> |> Choreo.Workflow.add_task(:process)
...> |> Choreo.Workflow.add_end(:end)
...> |> Choreo.Workflow.connect(:start, :process)
...> |> Choreo.Workflow.connect(:process, :end)
iex> dot = Choreo.Workflow.to_dot(workflow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "process")
true
Returns the raw Yog.Graph struct underpinning the workflow.
Examples
iex> workflow = Choreo.Workflow.new()
iex> graph = Choreo.Workflow.to_graph(workflow)
iex> graph.kind
:directed
@spec to_simple_graph( t(), keyword() ) :: Yog.Graph.t()
Collapses parallel edges into a simple Graph for algorithm analysis.