Choreo.Workflow (Choreo v0.7.0)

Copy Markdown View Source

Workflow / task orchestration diagram builder on top of Yog.

Choreo.Workflow models automated task orchestration where nodes are process steps and edges are execution dependencies. It supports:

  • Tasks — automated steps with timeout and retry config
  • Decisions — conditional branching
  • Fork / Join — parallel execution paths
  • Compensations — Saga-pattern rollback handlers
  • Events — triggers, timers, signals
  • Swimlanes — group tasks by team, service, or domain

When to use

Use Choreo.Workflow when designing distributed business processes, Saga transactions, CI/CD pipelines, or approval flows. It identifies the critical path, finds parallelizable tasks, and verifies that every failure scenario has a compensation route.

Further reading

Quick Start

workflow =
  Choreo.Workflow.new()
  |> Choreo.Workflow.add_start(:order_received)
  |> Choreo.Workflow.add_task(:charge_card, timeout_ms: 5000, retry: 3)
  |> Choreo.Workflow.add_task(:reserve_inventory, timeout_ms: 3000)
  |> Choreo.Workflow.add_decision(:sufficient_stock)
  |> Choreo.Workflow.add_task(:pack_items, timeout_ms: 10_000)
  |> Choreo.Workflow.add_task(:ship_order, timeout_ms: 5000)
  |> Choreo.Workflow.add_compensation(:refund_payment, for: :charge_card)
  |> Choreo.Workflow.add_end(:done)
  |> Choreo.Workflow.connect(:order_received, :charge_card)
  |> Choreo.Workflow.connect(:charge_card, :reserve_inventory)
  |> Choreo.Workflow.connect(:reserve_inventory, :sufficient_stock)
  |> Choreo.Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
  |> Choreo.Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
  |> Choreo.Workflow.connect(:pack_items, :ship_order)
  |> Choreo.Workflow.connect(:ship_order, :done)

dot = Choreo.Workflow.to_dot(workflow)

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; done [label="done", penwidth="2.0", fillcolor="#ef4444", shape="doublecircle"]; order_received [label="order_received", penwidth="2.0", fillcolor="#10b981", shape="circle"]; charge_card [label="charge_card (5000ms) retry: 3", fillcolor="#3b82f6", shape="box3d"]; reserve_inventory [label="reserve_inventory (3000ms)", fillcolor="#3b82f6", shape="box3d"]; sufficient_stock [label="sufficient_stock", fillcolor="#8b5cf6", shape="diamond"]; pack_items [label="pack_items (10000ms)", fillcolor="#3b82f6", shape="box3d"]; ship_order [label="ship_order (5000ms)", fillcolor="#3b82f6", shape="box3d"]; refund_payment [label="refund_payment", color="#ef4444", style="filled,dashed", fillcolor="#f87171", shape="note"]; order_received -> charge_card [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; charge_card -> reserve_inventory [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; reserve_inventory -> sufficient_stock [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; sufficient_stock -> pack_items [style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b", label="yes"]; sufficient_stock -> refund_payment [style="dashed", penwidth="1.5", fontcolor="#ef4444", color="#ef4444", label="no"]; pack_items -> ship_order [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; ship_order -> done [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; }

Analysis

# Longest-latency path through the workflow
{:ok, path, latency} = Choreo.Workflow.Analysis.critical_path(workflow)

# Tasks that can run in parallel
Choreo.Workflow.Analysis.parallelizable_tasks(workflow)

# Tasks with missing compensations
Choreo.Workflow.Analysis.missing_compensations(workflow)

# Validation
Choreo.Workflow.Analysis.validate(workflow)

Summary

Functions

Adds a compensation / rollback node (Saga pattern).

Adds a decision / gateway node for conditional branching.

Adds an end node (terminal).

Adds an event node (trigger, timer, signal).

Adds a fork node that splits execution into parallel paths.

Adds a join node that merges parallel paths.

Adds a start node (entry point).

Adds a swimlane grouping.

Adds an automated task node.

Returns all compensation node IDs.

Connects two workflow nodes with an execution dependency.

Returns all edges as {from, to, weight} tuples.

Returns all edges with their metadata as {from, to, weight, meta} tuples.

Returns all end node IDs.

Creates a new empty workflow graph.

Returns all node IDs in the workflow.

Returns all start node IDs.

Returns all task node IDs.

Renders the workflow to DOT format.

Returns the raw Yog.Graph struct underpinning the workflow.

Collapses parallel edges into a simple Graph for algorithm analysis.

Types

t()

@type t() :: %Choreo.Workflow{
  clusters: %{required(String.t()) => map()},
  edge_meta: %{optional(Yog.Multi.Graph.edge_id()) => map()},
  graph: Yog.Multi.Graph.t()
}

Functions

add_compensation(workflow, id, opts \\ [])

@spec add_compensation(t(), Yog.node_id(), keyword()) :: t()

Adds a compensation / rollback node (Saga pattern).

Options

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_compensation(workflow, :rollback, for: :process)
iex> Choreo.Workflow.compensations(workflow)
[:rollback]
iex> Map.get(workflow.graph.nodes, :rollback).target_task
:process

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; rollback [label="rollback", color="#ef4444", style="filled,dashed", fillcolor="#f87171", shape="note"]; }

add_decision(workflow, id, opts \\ [])

Adds a decision / gateway node for conditional branching.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_decision(workflow, :check)
iex> Map.get(workflow.graph.nodes, :check).node_type
:decision

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; check [label="check", fillcolor="#8b5cf6", shape="diamond"]; }

add_end(workflow, id, opts \\ [])

@spec add_end(t(), Yog.node_id(), keyword()) :: t()

Adds an end node (terminal).

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_end(workflow, :finish, label: "End")
iex> Choreo.Workflow.ends(workflow)
[:finish]
iex> Map.get(workflow.graph.nodes, :finish).node_type
:end

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; finish [label="End", penwidth="2.0", fillcolor="#ef4444", shape="doublecircle"]; }

add_event(workflow, id, opts \\ [])

Adds an event node (trigger, timer, signal).

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_event(workflow, :timer)
iex> Map.get(workflow.graph.nodes, :timer).node_type
:event

add_fork(workflow, id, opts \\ [])

@spec add_fork(t(), Yog.node_id(), keyword()) :: t()

Adds a fork node that splits execution into parallel paths.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_fork(workflow, :split)
iex> Map.get(workflow.graph.nodes, :split).node_type
:fork

add_join(workflow, id, opts \\ [])

Adds a join node that merges parallel paths.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_join(workflow, :merge)
iex> Map.get(workflow.graph.nodes, :merge).node_type
:join

add_start(workflow, id, opts \\ [])

@spec add_start(t(), Yog.node_id(), keyword()) :: t()

Adds a start node (entry point).

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_start(workflow, :begin, label: "Start")
iex> Choreo.Workflow.starts(workflow)
[:begin]
iex> Map.get(workflow.graph.nodes, :begin).node_type
:start

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; begin [label="Start", penwidth="2.0", fillcolor="#10b981", shape="circle"]; }

add_swimlane(workflow, name, opts \\ [])

@spec add_swimlane(t(), String.t() | atom(), keyword()) :: t()

Adds a swimlane grouping.

Swimlanes are rendered as subgraph clusters. Nodes can be assigned to a swimlane via the :swimlane option in node builders.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_swimlane("backend", label: "Backend Services")
...>   |> Choreo.Workflow.add_task(:api, swimlane: "backend")
iex> Map.get(workflow.graph.nodes, :api)[:cluster]
"cluster_backend"

add_task(workflow, id, opts \\ [])

Adds an automated task node.

Options

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = Choreo.Workflow.add_task(workflow, :process, timeout_ms: 5000, retry: 3)
iex> Choreo.Workflow.tasks(workflow)
[:process]
iex> Map.get(workflow.graph.nodes, :process).timeout_ms
5000
iex> Map.get(workflow.graph.nodes, :process).retry
3

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; process [label="process (5000ms) retry: 3", fillcolor="#3b82f6", shape="box3d"]; }

compensations(workflow)

@spec compensations(t()) :: [Yog.node_id()]

Returns all compensation node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_compensation(:rollback, for: :task_a)
...>   |> Choreo.Workflow.add_compensation(:undo, for: :task_b)
iex> Enum.sort(Choreo.Workflow.compensations(workflow))
[:rollback, :undo]

connect(workflow, from, to, opts \\ [])

Connects two workflow nodes with an execution dependency.

Multiple connections are allowed per (from, to) pair (parallel edges).

Options

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
iex> [{:a, :b, _, meta}] = Choreo.Workflow.edges_with_meta(workflow)
iex> meta.edge_type
:sequence

Diagram

digraph G { graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; a [label="a", penwidth="2.0", fillcolor="#10b981", shape="circle"]; b [label="b", fillcolor="#3b82f6", shape="box3d"]; a -> b [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"]; }

edges(workflow)

@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]

Returns all edges as {from, to, weight} tuples.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.edges(workflow)
[{:a, :b, 1}]

edges_with_meta(workflow)

@spec edges_with_meta(t()) :: [{Yog.node_id(), Yog.node_id(), number(), map()}]

Returns all edges with their metadata as {from, to, weight, meta} tuples.

ends(workflow)

@spec ends(t()) :: [Yog.node_id()]

Returns all end node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_end(:a)
...>   |> Choreo.Workflow.add_end(:b)
iex> Enum.sort(Choreo.Workflow.ends(workflow))
[:a, :b]

new()

@spec new() :: t()

Creates a new empty workflow graph.

Workflow graphs are always directed.

Examples

iex> workflow = Choreo.Workflow.new()
iex> Choreo.Workflow.nodes(workflow)
[]
iex> Choreo.Workflow.starts(workflow)
[]
iex> Choreo.Workflow.ends(workflow)
[]

nodes(workflow)

@spec nodes(t()) :: [Yog.node_id()]

Returns all node IDs in the workflow.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
iex> Enum.sort(Choreo.Workflow.nodes(workflow))
[:a, :b]

starts(workflow)

@spec starts(t()) :: [Yog.node_id()]

Returns all start node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_start(:b)
iex> Enum.sort(Choreo.Workflow.starts(workflow))
[:a, :b]

tasks(workflow)

@spec tasks(t()) :: [Yog.node_id()]

Returns all task node IDs.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_task(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.add_start(:s)
iex> Enum.sort(Choreo.Workflow.tasks(workflow))
[:a, :b]

to_dot(workflow, opts \\ [])

@spec to_dot(
  t(),
  keyword()
) :: String.t()

Renders the workflow to DOT format.

Options

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:start)
...>   |> Choreo.Workflow.add_task(:process)
...>   |> Choreo.Workflow.add_end(:end)
...>   |> Choreo.Workflow.connect(:start, :process)
...>   |> Choreo.Workflow.connect(:process, :end)
iex> dot = Choreo.Workflow.to_dot(workflow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "process")
true

to_graph(workflow)

@spec to_graph(t()) :: Yog.graph()

Returns the raw Yog.Graph struct underpinning the workflow.

Examples

iex> workflow = Choreo.Workflow.new()
iex> graph = Choreo.Workflow.to_graph(workflow)
iex> graph.kind
:directed

to_simple_graph(workflow, opts \\ [])

@spec to_simple_graph(
  t(),
  keyword()
) :: Yog.Graph.t()

Collapses parallel edges into a simple Graph for algorithm analysis.