Analysis functions for Choreo.Workflow orchestration diagrams.
Provides algorithms that answer practical questions about a workflow:
- What is the critical path? (longest latency chain)
- Which tasks can run in parallel?
- What breaks if a task fails? (failure scenarios)
- Which tasks lack compensations?
- Where are the bottlenecks? (high latency / high retry)
Further reading
Summary
Functions
Returns high-latency or high-retry task node IDs.
Finds the longest weighted path from any start to any end.
Returns nodes that cannot reach any end node.
Returns tasks that have at least one outgoing compensation edge.
Returns tasks that have retry configured but no compensation path.
Returns nodes that are not reachable from any start node.
Returns tasks grouped by topological level.
Returns all task node IDs reachable from any start node.
Simulates execution and returns estimated total latency per node.
Returns tasks that can fail but have no valid compensation path.
Validates a workflow and returns a list of issues.
Functions
@spec bottlenecks( Choreo.Workflow.t(), keyword() ) :: [Yog.node_id()]
Returns high-latency or high-retry task node IDs.
Options
:latency_threshold— minimum:timeout_msto qualify (default:10_000):retry_threshold— minimum:retrycount to qualify (default:2)
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:fast, timeout_ms: 100)
...> |> Choreo.Workflow.add_task(:slow, timeout_ms: 20_000)
iex> Choreo.Workflow.Analysis.bottlenecks(workflow, latency_threshold: 10_000)
[:slow]This analysis answers the question: "Which tasks are high-latency or high-retry?"
@spec critical_path(Choreo.Workflow.t()) :: {:ok, [Yog.node_id()], number()} | :error
Finds the longest weighted path from any start to any end.
Edge weights default to the target task's :timeout_ms. Returns
{:ok, [id], total_weight} or :error if cyclic or no start→end path.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b, timeout_ms: 10)
...> |> Choreo.Workflow.add_task(:c, timeout_ms: 5)
...> |> Choreo.Workflow.add_end(:d)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :c)
...> |> Choreo.Workflow.connect(:c, :d)
iex> Choreo.Workflow.Analysis.critical_path(workflow)
{:ok, [:a, :b, :c, :d], 16}This analysis answers the question: "What is the slowest end-to-end execution path?"
@spec dead_ends(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns nodes that cannot reach any end node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_task(:dead)
...> |> Choreo.Workflow.add_end(:finish)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :finish)
iex> Choreo.Workflow.Analysis.dead_ends(workflow)
[:dead]This analysis answers the question: "Which tasks can never reach an end node?"
@spec failure_scenarios(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns tasks that have at least one outgoing compensation edge.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:process)
...> |> Choreo.Workflow.add_compensation(:rollback)
...> |> Choreo.Workflow.connect(:process, :rollback, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.failure_scenarios(workflow)
[:process]This analysis answers the question: "Which tasks have compensation handlers?"
@spec missing_compensations(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns tasks that have retry configured but no compensation path.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:risky, retry: 3)
...> |> Choreo.Workflow.add_task(:safe, retry: 2)
...> |> Choreo.Workflow.add_compensation(:rollback, for: :safe)
...> |> Choreo.Workflow.add_end(:end)
...> |> Choreo.Workflow.connect(:a, :risky)
...> |> Choreo.Workflow.connect(:risky, :safe)
...> |> Choreo.Workflow.connect(:safe, :end)
...> |> Choreo.Workflow.connect(:safe, :rollback, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.missing_compensations(workflow)
[:risky]This analysis answers the question: "Which retry-configured tasks lack compensations?"
@spec orphan_tasks(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns nodes that are not reachable from any start node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_task(:orphan)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.Analysis.orphan_tasks(workflow)
[:orphan]This analysis answers the question: "Which tasks are not reachable from any start node?"
@spec parallelizable_tasks(Choreo.Workflow.t()) :: [[Yog.node_id()]]
Returns tasks grouped by topological level.
Tasks at the same level have no dependencies on each other and can theoretically run in parallel.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:start)
...> |> Choreo.Workflow.add_fork(:split)
...> |> Choreo.Workflow.add_task(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_join(:merge)
...> |> Choreo.Workflow.add_end(:end)
...> |> Choreo.Workflow.connect(:start, :split)
...> |> Choreo.Workflow.connect(:split, :a)
...> |> Choreo.Workflow.connect(:split, :b)
...> |> Choreo.Workflow.connect(:a, :merge)
...> |> Choreo.Workflow.connect(:b, :merge)
...> |> Choreo.Workflow.connect(:merge, :end)
iex> groups = Choreo.Workflow.Analysis.parallelizable_tasks(workflow)
iex> Enum.any?(groups, fn g -> Enum.sort(g) == [:a, :b] end)
trueThis analysis answers the question: "Which tasks can run in parallel?"
@spec reachable_tasks(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns all task node IDs reachable from any start node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_task(:c)
...> |> Choreo.Workflow.connect(:a, :b)
iex> Enum.sort(Choreo.Workflow.Analysis.reachable_tasks(workflow))
[:a, :b]This analysis answers the question: "Which tasks are reachable from any start node?"
@spec simulate(Choreo.Workflow.t()) :: %{optional(Yog.node_id()) => map()}
Simulates execution and returns estimated total latency per node.
Assumes sequential execution along the critical path. Parallel paths are counted by their longest branch.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b, timeout_ms: 5000)
...> |> Choreo.Workflow.add_task(:c, timeout_ms: 3000, retry: 2, retry_backoff_ms: 100)
...> |> Choreo.Workflow.add_end(:d)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :c)
...> |> Choreo.Workflow.connect(:c, :d)
iex> result = Choreo.Workflow.Analysis.simulate(workflow)
iex> result[:b].task_latency
5000
iex> result[:c].task_latency
3000
iex> result[:c].retry_latency
200This analysis answers the question: "What is the estimated latency for each task?"
@spec uncompensated_paths(Choreo.Workflow.t()) :: [Yog.node_id()]
Returns tasks that can fail but have no valid compensation path.
A task "can fail" if it has an outgoing :error edge.
A valid compensation path is an unbroken chain of :compensation edges
leading to a :start or :end node.
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:start)
...> |> Choreo.Workflow.add_task(:process_payment)
...> |> Choreo.Workflow.add_compensation(:rollback_payment, for: :process_payment)
...> |> Choreo.Workflow.add_task(:dead_end_comp)
...> |> Choreo.Workflow.add_end(:done)
...> |> Choreo.Workflow.connect(:start, :process_payment)
...> |> Choreo.Workflow.connect(:process_payment, :done)
...> |> Choreo.Workflow.connect(:process_payment, :rollback_payment, edge_type: :error)
...> |> Choreo.Workflow.connect(:rollback_payment, :dead_end_comp, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.uncompensated_paths(workflow)
[:process_payment]This analysis answers the question: "Which tasks can fail without a valid compensation path?"
@spec validate(Choreo.Workflow.t()) :: [{:error | :warning, String.t()}]
Validates a workflow and returns a list of issues.
Checks for:
- missing start / end nodes
- cycles
- orphan tasks
- dead-end tasks
- tasks with retries but no compensations
- unreachable compensation nodes
Examples
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_start(:a)
...> |> Choreo.Workflow.add_task(:b)
...> |> Choreo.Workflow.add_end(:c)
...> |> Choreo.Workflow.connect(:a, :b)
...> |> Choreo.Workflow.connect(:b, :c)
iex> Choreo.Workflow.Analysis.validate(workflow)
[]
iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...> |> Choreo.Workflow.add_task(:a)
...> |> Choreo.Workflow.add_end(:b)
...> |> Choreo.Workflow.connect(:a, :b)
iex> issues = Choreo.Workflow.Analysis.validate(workflow)
iex> {:error, "No start nodes"} in issues
trueThis analysis answers the question: "Is the workflow structurally sound?"