Analysis functions for Choreo.Workflow orchestration diagrams.
Provides algorithms that answer practical questions about a workflow:
- What is the critical path? (longest latency chain)
- Which tasks can run in parallel?
- What breaks if a task fails? (failure scenarios)
- Which tasks lack compensations?
- Where are the bottlenecks? (high latency / high retry)
Further reading
Summary
Functions
Returns high-latency or high-retry task node IDs.
Returns tasks that have at least one outgoing compensation edge.
Finds the longest weighted path from any start to any end.
Returns nodes that cannot reach any end node.
Generates a heatmap of the workflow based on cumulative execution latency.
Returns tasks that have retry configured but no compensation path.
Returns nodes that are not reachable from any start node.
Returns tasks grouped by topological level.
Returns all task node IDs reachable from any start node.
Simulates execution and returns estimated total latency per node.
Returns tasks that can fail but have no valid compensation path.
Validates a workflow and returns a list of issues.
Functions
@spec bottlenecks( Choreo.Workflow.t(), keyword() ) :: [Yog.node_id()]
Returns high-latency or high-retry task node IDs.
Options
:latency_threshold— minimum:timeout_msto qualify (default:10_000):retry_threshold— minimum:retrycount to qualify (default:2)
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:fast, timeout_ms: 100)
...> |> Choreo.Workflow.add_task(:slow, timeout_ms: 20_000)
iex> Choreo.Workflow.Analysis.bottlenecks(workflow, latency_threshold: 10_000)
[:slow]This analysis answers the question: "Which tasks are high-latency or high-retry?"
@spec compensable_tasks(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns tasks that have at least one outgoing compensation edge.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:process)
...> |> Choreo.Workflow.add_compensation(:rollback)
...> |> Choreo.Workflow.connect(:process, :rollback, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.compensable_tasks(workflow)
[:process]This analysis answers the question: "Which tasks have compensation handlers?"
@spec critical_path(Choreo.Workflow.t()) :: {:ok, [Yog.node_id()], number()} | :error
Finds the longest weighted path from any start to any end.
Edge weights default to the target task's :timeout_ms. Returns
{:ok, [id], total_weight} or :error if cyclic or no start→end path.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b, timeout_ms: 10)
...> |> Choreo.Workflow.add_task(:c, timeout_ms: 5)
...> |> Choreo.Workflow.add_end(:d)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :c)
...> |> Choreo.Workflow.connect(:c, :d)
iex> Choreo.Workflow.Analysis.critical_path(workflow)
{:ok, [:a, :b, :c, :d], 16}This analysis answers the question: "What is the slowest end-to-end execution path?"
@spec dead_ends(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns nodes that cannot reach any end node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_task(:dead)
...> |> Choreo.Workflow.add_end(:finish)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :finish)
iex> Choreo.Workflow.Analysis.dead_ends(workflow)
[:dead]This analysis answers the question: "Which tasks can never reach an end node?"
@spec heatmap( Choreo.Workflow.t(), keyword() ) :: Choreo.Workflow.t()
Generates a heatmap of the workflow based on cumulative execution latency.
Nodes with higher total latency (including retries and backoffs) will be colored with "hotter" colors.
Options
:palette— Color palette (:heat,:cool,:spectral)
@spec missing_compensations(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns tasks that have retry configured but no compensation path.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:risky, retry: 3)
...> |> Choreo.Workflow.add_task(:safe, retry: 2)
...> |> Choreo.Workflow.add_compensation(:rollback, for: :safe)
...> |> Choreo.Workflow.add_end(:end)
...> |> Choreo.Workflow.connect(:a, :risky)
...> |> Choreo.Workflow.connect(:risky, :safe)
...> |> Choreo.Workflow.connect(:safe, :end)
...> |> Choreo.Workflow.connect(:safe, :rollback, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.missing_compensations(workflow)
[:risky]This analysis answers the question: "Which retry-configured tasks lack compensations?"
@spec orphan_tasks(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns nodes that are not reachable from any start node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_task(:orphan)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.Analysis.orphan_tasks(workflow)
[:orphan]This analysis answers the question: "Which tasks are not reachable from any start node?"
@spec parallelizable_tasks(Choreo.Workflow.t()) :: [[Yog.node_id()]]
Returns tasks grouped by topological level.
Tasks at the same level have no dependencies on each other and can theoretically run in parallel.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:start)
...> |> Choreo.Workflow.add_fork(:split)
...> |> Choreo.Workflow.add_task(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_join(:merge)
...> |> Choreo.Workflow.add_end(:end)
...> |> Choreo.Workflow.connect(:start, :split)
...> |> Choreo.Workflow.connect(:split, :a)
...> |> Choreo.Workflow.connect(:split, :b)
...> |> Choreo.Workflow.connect(:a, :merge)
...> |> Choreo.Workflow.connect(:b, :merge)
...> |> Choreo.Workflow.connect(:merge, :end)
iex> groups = Choreo.Workflow.Analysis.parallelizable_tasks(workflow)
iex> Enum.any?(groups, fn g -> Enum.sort(g) == [:a, :b] end)
trueThis analysis answers the question: "Which tasks can run in parallel?"
@spec reachable_tasks(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns all task node IDs reachable from any start node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_task(:c)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Enum.sort(Choreo.Workflow.Analysis.reachable_tasks(workflow))
[:a, :b]This analysis answers the question: "Which tasks are reachable from any start node?"
@spec simulate(Choreo.Workflow.t()) :: %{optional(Yog.node_id()) => map()}
Simulates execution and returns estimated total latency per node.
Assumes sequential execution along the critical path. Parallel paths are counted by their longest branch.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b, timeout_ms: 5000)
...> |> Choreo.Workflow.add_task(:c, timeout_ms: 3000, retry: 2, retry_backoff_ms: 100)
...> |> Choreo.Workflow.add_end(:d)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :c)
...> |> Choreo.Workflow.connect(:c, :d)
iex> result = Choreo.Workflow.Analysis.simulate(workflow)
iex> result[:b].task_latency
5000
iex> result[:c].task_latency
3000
iex> result[:c].cumulative_latency
8000This analysis answers the question: "What is the estimated latency for each task?"
Retry latency is not included — simulate models the happy path.
Use critical_path/1 for worst-case latency including retries.
@spec uncompensated_paths(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns tasks that can fail but have no valid compensation path.
A task "can fail" if it has an outgoing :error edge.
A valid compensation path is an unbroken chain of :compensation edges
that terminates (reaches a node with no further outgoing :compensation
edges). The terminus need not be a :start or :end node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:start)
...> |> Choreo.Workflow.add_task(:process_payment)
...> |> Choreo.Workflow.add_compensation(:rollback_payment, for: :process_payment)
...> |> Choreo.Workflow.add_task(:dead_end_comp)
...> |> Choreo.Workflow.add_end(:done)
...> |> Choreo.Workflow.connect(:start, :process_payment)
...> |> Choreo.Workflow.connect(:process_payment, :done)
...> |> Choreo.Workflow.connect(:process_payment, :rollback_payment, edge_type: :error)
...> |> Choreo.Workflow.connect(:rollback_payment, :dead_end_comp, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.uncompensated_paths(workflow)
[]This analysis answers the question: "Which tasks can fail without a valid compensation path?"
A compensation chain is valid if it terminates (reaches a node with no
further outgoing :compensation edges). The example above terminates at
:dead_end_comp, so :process_payment is considered compensated.
@spec validate(Choreo.Workflow.t()) :: [{:error | :warning, String.t()}]
Validates a workflow and returns a list of issues.
Checks for:
- missing start / end nodes
- cycles
- orphan tasks
- dead-end tasks
- tasks with retries but no compensations
- unreachable compensation nodes
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_end(:c)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :c)
iex> Choreo.Workflow.Analysis.validate(workflow)
[]
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:a)
...> |> Choreo.Workflow.add_end(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> issues = Choreo.Workflow.Analysis.validate(workflow)
iex> {:error, "No start nodes"} in issues
trueThis analysis answers the question: "Is the workflow structurally sound?"