Choreo.Workflow (Choreo v0.6.0)

Copy Markdown View Source

Workflow / task orchestration diagram builder on top of Yog.

Choreo.Workflow models automated task orchestration where nodes are process steps and edges are execution dependencies. It supports:

  • Tasks — automated steps with timeout and retry config
  • Decisions — conditional branching
  • Fork / Join — parallel execution paths
  • Compensations — Saga-pattern rollback handlers
  • Events — triggers, timers, signals
  • Swimlanes — group tasks by team, service, or domain

When to use

Use Choreo.Workflow when designing distributed business processes, Saga transactions, CI/CD pipelines, or approval flows. It identifies the critical path, finds parallelizable tasks, and verifies that every failure scenario has a compensation route.

Further reading

Quick Start

workflow =
  Choreo.Workflow.new()
  |> Choreo.Workflow.add_start(:order_received)
  |> Choreo.Workflow.add_task(:charge_card, timeout_ms: 5000, retry: 3)
  |> Choreo.Workflow.add_task(:reserve_inventory, timeout_ms: 3000)
  |> Choreo.Workflow.add_decision(:sufficient_stock)
  |> Choreo.Workflow.add_task(:pack_items, timeout_ms: 10_000)
  |> Choreo.Workflow.add_task(:ship_order, timeout_ms: 5000)
  |> Choreo.Workflow.add_compensation(:refund_payment, for: :charge_card)
  |> Choreo.Workflow.add_end(:done)
  |> Choreo.Workflow.connect(:order_received, :charge_card)
  |> Choreo.Workflow.connect(:charge_card, :reserve_inventory)
  |> Choreo.Workflow.connect(:reserve_inventory, :sufficient_stock)
  |> Choreo.Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
  |> Choreo.Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
  |> Choreo.Workflow.connect(:pack_items, :ship_order)
  |> Choreo.Workflow.connect(:ship_order, :done)

dot = Choreo.Workflow.to_dot(workflow)

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; done [label="done", penwidth="2.0", fillcolor="#ef4444", shape="doublecircle"]; order_received [label="order_received", penwidth="2.0", fillcolor="#10b981", shape="circle"]; charge_card [label="charge_card (5000ms) retry: 3", fillcolor="#3b82f6", shape="box3d"]; reserve_inventory [label="reserve_inventory (3000ms)", fillcolor="#3b82f6", shape="box3d"]; sufficient_stock [label="sufficient_stock", fillcolor="#8b5cf6", shape="diamond"]; pack_items [label="pack_items (10000ms)", fillcolor="#3b82f6", shape="box3d"]; ship_order [label="ship_order (5000ms)", fillcolor="#3b82f6", shape="box3d"]; refund_payment [label="refund_payment", color="#ef4444", style="filled,dashed", fillcolor="#f87171", shape="note"]; order_received -> charge_card [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; charge_card -> reserve_inventory [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; reserve_inventory -> sufficient_stock [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; sufficient_stock -> pack_items [style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b", label="yes"]; sufficient_stock -> refund_payment [style="dashed", penwidth="1.5", fontcolor="#ef4444", color="#ef4444", label="no"]; pack_items -> ship_order [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; ship_order -> done [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; }

Analysis

# Longest-latency path through the workflow
{:ok, path, latency} = Choreo.Workflow.Analysis.critical_path(workflow)

# Tasks that can run in parallel
Choreo.Workflow.Analysis.parallelizable_tasks(workflow)

# Tasks with missing compensations
Choreo.Workflow.Analysis.missing_compensations(workflow)

# Validation
Choreo.Workflow.Analysis.validate(workflow)

Summary

Functions

Adds a compensation / rollback node (Saga pattern).

Adds a decision / gateway node for conditional branching.

Adds an end node (terminal).

Adds an event node (trigger, timer, signal).

Adds a fork node that splits execution into parallel paths.

Adds a join node that merges parallel paths.

Adds a start node (entry point).

Adds a swimlane grouping.

Adds an automated task node.

Returns all compensation node IDs.

Connects two workflow nodes with an execution dependency.

Returns all edges as {from, to, weight} tuples.

Returns all end node IDs.

Creates a new empty workflow graph.

Returns all node IDs in the workflow.

Returns all start node IDs.

Returns all task node IDs.

Renders the workflow to DOT format.

Returns the raw Yog.Graph struct underpinning the workflow.

Types

t()

@type t() :: %Choreo.Workflow{
  clusters: %{required(String.t()) => map()},
  edge_meta: %{optional({Yog.node_id(), Yog.node_id()}) => map()},
  graph: Yog.graph()
}

Functions

add_compensation(workflow, id, opts \\ [])

@spec add_compensation(t(), Yog.node_id(), keyword()) :: t()

Adds a compensation / rollback node (Saga pattern).

Options

  • :for — the task id this compensation rolls back
  • :label — display label
  • :handler — handler name / reference
  • :description — tooltip text

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_compensation(workflow, :rollback, for: :process)
iex> Choreo.Workflow.compensations(workflow)
[:rollback]
iex> Yog.node(workflow.graph, :rollback).target_task
:process

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; rollback [label="rollback", color="#ef4444", style="filled,dashed", fillcolor="#f87171", shape="note"]; }

add_decision(workflow, id, opts \\ [])

@spec add_decision(t(), Yog.node_id(), keyword()) :: t()

Adds a decision / gateway node for conditional branching.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_decision(workflow, :check)
iex> Yog.node(workflow.graph, :check).node_type
:decision

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; check [label="check", fillcolor="#8b5cf6", shape="diamond"]; }

add_end(workflow, id, opts \\ [])

@spec add_end(t(), Yog.node_id(), keyword()) :: t()

Adds an end node (terminal).

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_end(workflow, :finish, label: "End")
iex> Choreo.Workflow.ends(workflow)
[:finish]
iex> Yog.node(workflow.graph, :finish).node_type
:end

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; finish [label="End", penwidth="2.0", fillcolor="#ef4444", shape="doublecircle"]; }

add_event(workflow, id, opts \\ [])

@spec add_event(t(), Yog.node_id(), keyword()) :: t()

Adds an event node (trigger, timer, signal).

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_event(workflow, :timer)
iex> Yog.node(workflow.graph, :timer).node_type
:event

add_fork(workflow, id, opts \\ [])

@spec add_fork(t(), Yog.node_id(), keyword()) :: t()

Adds a fork node that splits execution into parallel paths.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_fork(workflow, :split)
iex> Yog.node(workflow.graph, :split).node_type
:fork

add_join(workflow, id, opts \\ [])

@spec add_join(t(), Yog.node_id(), keyword()) :: t()

Adds a join node that merges parallel paths.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_join(workflow, :merge)
iex> Yog.node(workflow.graph, :merge).node_type
:join

add_start(workflow, id, opts \\ [])

@spec add_start(t(), Yog.node_id(), keyword()) :: t()

Adds a start node (entry point).

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_start(workflow, :begin, label: "Start")
iex> Choreo.Workflow.starts(workflow)
[:begin]
iex> Yog.node(workflow.graph, :begin).node_type
:start

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; begin [label="Start", penwidth="2.0", fillcolor="#10b981", shape="circle"]; }

add_swimlane(workflow, name, opts \\ [])

@spec add_swimlane(t(), String.t() | atom(), keyword()) :: t()

Adds a swimlane grouping.

Swimlanes are rendered as subgraph clusters. Nodes can be assigned to a swimlane via the :swimlane option in node builders.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_swimlane("backend", label: "Backend Services")
...>   |> Choreo.Workflow.add_task(:api, swimlane: "backend")
iex> Yog.node(workflow.graph, :api)[:cluster]
"cluster_backend"

add_task(workflow, id, opts \\ [])

@spec add_task(t(), Yog.node_id(), keyword()) :: t()

Adds an automated task node.

Options

  • :timeout_ms — maximum time allowed for the task (default: 5000)
  • :retry — number of retry attempts on failure (default: 0)
  • :retry_backoff_ms — backoff between retries in milliseconds
  • :label — display label
  • :handler — handler name / reference
  • :description — tooltip text
  • :swimlane — swimlane group name

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_task(workflow, :process, timeout_ms: 5000, retry: 3)
iex> Choreo.Workflow.tasks(workflow)
[:process]
iex> Yog.node(workflow.graph, :process).timeout_ms
5000
iex> Yog.node(workflow.graph, :process).retry
3

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; process [label="process (5000ms) retry: 3", fillcolor="#3b82f6", shape="box3d"]; }

compensations(workflow)

@spec compensations(t()) :: [Yog.node_id()]

Returns all compensation node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_compensation(:rollback, for: :task_a)
...>   |> Choreo.Workflow.add_compensation(:undo, for: :task_b)
iex> Enum.sort(Choreo.Workflow.compensations(workflow))
[:rollback, :undo]

connect(workflow, from, to, opts \\ [])

@spec connect(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()

Connects two workflow nodes with an execution dependency.

Options

  • :condition — branch condition label (shown on decision edges)
  • :edge_type:sequence (default), :compensation, :retry, :failure, :timeout
  • :weight — edge weight for path calculations (defaults to target task timeout_ms)
  • :label — override edge label

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
[{:a, :b, 1}]
iex> workflow.edge_meta[{:a, :b}].edge_type
:sequence

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; a [label="a", penwidth="2.0", fillcolor="#10b981", shape="circle"]; b [label="b", fillcolor="#3b82f6", shape="box3d"]; a -> b [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; }

edges(workflow)

@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]

Returns all edges as {from, to, weight} tuples.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
[{:a, :b, 1}]

ends(workflow)

@spec ends(t()) :: [Yog.node_id()]

Returns all end node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_end(:a)
...>   |> Choreo.Workflow.add_end(:b)
iex> Enum.sort(Choreo.Workflow.ends(workflow))
[:a, :b]

new()

@spec new() :: t()

Creates a new empty workflow graph.

Workflow graphs are always directed.

Examples

iex> workflow = Choreo.Workflow.new()
iex> Choreo.Workflow.nodes(workflow)
[]
iex> Choreo.Workflow.starts(workflow)
[]
iex> Choreo.Workflow.ends(workflow)
[]

nodes(workflow)

@spec nodes(t()) :: [Yog.node_id()]

Returns all node IDs in the workflow.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
iex> Enum.sort(Choreo.Workflow.nodes(workflow))
[:a, :b]

starts(workflow)

@spec starts(t()) :: [Yog.node_id()]

Returns all start node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_start(:b)
iex> Enum.sort(Choreo.Workflow.starts(workflow))
[:a, :b]

tasks(workflow)

@spec tasks(t()) :: [Yog.node_id()]

Returns all task node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_task(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.add_start(:s)
iex> Enum.sort(Choreo.Workflow.tasks(workflow))
[:a, :b]

to_dot(workflow, opts \\ [])

@spec to_dot(
  t(),
  keyword()
) :: String.t()

Renders the workflow to DOT format.

Options

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:start)
...>   |> Choreo.Workflow.add_task(:process)
...>   |> Choreo.Workflow.add_end(:end)
...>   |> Choreo.Workflow.connect(:start, :process)
...>   |> Choreo.Workflow.connect(:process, :end)
iex> dot = Choreo.Workflow.to_dot(workflow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "process")
true

to_graph(workflow)

@spec to_graph(t()) :: Yog.graph()

Returns the raw Yog.Graph struct underpinning the workflow.

Examples

iex> workflow = Choreo.Workflow.new()
iex> graph = Choreo.Workflow.to_graph(workflow)
iex> graph.kind
:directed