Choreo.Workflow.Analysis (Choreo v0.7.0)

Copy Markdown View Source

Analysis functions for Choreo.Workflow orchestration diagrams.

Provides algorithms that answer practical questions about a workflow:

  • What is the critical path? (longest latency chain)
  • Which tasks can run in parallel?
  • What breaks if a task fails? (failure scenarios)
  • Which tasks lack compensations?
  • Where are the bottlenecks? (high latency / high retry)

Further reading

Summary

Functions

Returns high-latency or high-retry task node IDs.

Finds the longest weighted path from any start to any end.

Returns nodes that cannot reach any end node.

Returns tasks that have at least one outgoing compensation edge.

Returns tasks that have retry configured but no compensation path.

Returns nodes that are not reachable from any start node.

Returns tasks grouped by topological level.

Returns all task node IDs reachable from any start node.

Simulates execution and returns estimated total latency per node.

Returns tasks that can fail but have no valid compensation path.

Validates a workflow and returns a list of issues.

Functions

bottlenecks(workflow, opts \\ [])

@spec bottlenecks(
  Choreo.Workflow.t(),
  keyword()
) :: [Yog.node_id()]

Returns high-latency or high-retry task node IDs.

Options

  • :latency_threshold — minimum :timeout_ms to qualify (default: 10_000)
  • :retry_threshold — minimum :retry count to qualify (default: 2)

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_task(:fast, timeout_ms: 100)
...>   |> Choreo.Workflow.add_task(:slow, timeout_ms: 20_000)
iex> Choreo.Workflow.Analysis.bottlenecks(workflow, latency_threshold: 10_000)
[:slow]

This analysis answers the question: "Which tasks are high-latency or high-retry?"

critical_path(workflow)

@spec critical_path(Choreo.Workflow.t()) :: {:ok, [Yog.node_id()], number()} | :error

Finds the longest weighted path from any start to any end.

Edge weights default to the target task's :timeout_ms. Returns {:ok, [id], total_weight} or :error if cyclic or no start→end path.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b, timeout_ms: 10)
...>   |> Choreo.Workflow.add_task(:c, timeout_ms: 5)
...>   |> Choreo.Workflow.add_end(:d)
...>   |> Choreo.Workflow.connect(:a, :b)
...>   |> Choreo.Workflow.connect(:b, :c)
...>   |> Choreo.Workflow.connect(:c, :d)
iex> Choreo.Workflow.Analysis.critical_path(workflow)
{:ok, [:a, :b, :c, :d], 16}

This analysis answers the question: "What is the slowest end-to-end execution path?"

dead_ends(workflow)

@spec dead_ends(Choreo.Workflow.t()) :: [Yog.node_id()]

Returns nodes that cannot reach any end node.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.add_task(:dead)
...>   |> Choreo.Workflow.add_end(:finish)
...>   |> Choreo.Workflow.connect(:a, :b)
...>   |> Choreo.Workflow.connect(:b, :finish)
iex> Choreo.Workflow.Analysis.dead_ends(workflow)
[:dead]

This analysis answers the question: "Which tasks can never reach an end node?"

failure_scenarios(workflow)

@spec failure_scenarios(Choreo.Workflow.t()) :: [Yog.node_id()]

Returns tasks that have at least one outgoing compensation edge.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_task(:process)
...>   |> Choreo.Workflow.add_compensation(:rollback)
...>   |> Choreo.Workflow.connect(:process, :rollback, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.failure_scenarios(workflow)
[:process]

This analysis answers the question: "Which tasks have compensation handlers?"

missing_compensations(workflow)

@spec missing_compensations(Choreo.Workflow.t()) :: [Yog.node_id()]

Returns tasks that have retry configured but no compensation path.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:risky, retry: 3)
...>   |> Choreo.Workflow.add_task(:safe, retry: 2)
...>   |> Choreo.Workflow.add_compensation(:rollback, for: :safe)
...>   |> Choreo.Workflow.add_end(:end)
...>   |> Choreo.Workflow.connect(:a, :risky)
...>   |> Choreo.Workflow.connect(:risky, :safe)
...>   |> Choreo.Workflow.connect(:safe, :end)
...>   |> Choreo.Workflow.connect(:safe, :rollback, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.missing_compensations(workflow)
[:risky]

This analysis answers the question: "Which retry-configured tasks lack compensations?"

orphan_tasks(workflow)

@spec orphan_tasks(Choreo.Workflow.t()) :: [Yog.node_id()]

Returns nodes that are not reachable from any start node.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.add_task(:orphan)
...>   |> Choreo.Workflow.connect(:a, :b)
iex> Choreo.Workflow.Analysis.orphan_tasks(workflow)
[:orphan]

This analysis answers the question: "Which tasks are not reachable from any start node?"

parallelizable_tasks(workflow)

@spec parallelizable_tasks(Choreo.Workflow.t()) :: [[Yog.node_id()]]

Returns tasks grouped by topological level.

Tasks at the same level have no dependencies on each other and can theoretically run in parallel.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:start)
...>   |> Choreo.Workflow.add_fork(:split)
...>   |> Choreo.Workflow.add_task(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.add_join(:merge)
...>   |> Choreo.Workflow.add_end(:end)
...>   |> Choreo.Workflow.connect(:start, :split)
...>   |> Choreo.Workflow.connect(:split, :a)
...>   |> Choreo.Workflow.connect(:split, :b)
...>   |> Choreo.Workflow.connect(:a, :merge)
...>   |> Choreo.Workflow.connect(:b, :merge)
...>   |> Choreo.Workflow.connect(:merge, :end)
iex> groups = Choreo.Workflow.Analysis.parallelizable_tasks(workflow)
iex> Enum.any?(groups, fn g -> Enum.sort(g) == [:a, :b] end)
true

This analysis answers the question: "Which tasks can run in parallel?"

reachable_tasks(workflow)

@spec reachable_tasks(Choreo.Workflow.t()) :: [Yog.node_id()]

Returns all task node IDs reachable from any start node.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.add_task(:c)
...>   |> Choreo.Workflow.connect(:a, :b)
iex> Enum.sort(Choreo.Workflow.Analysis.reachable_tasks(workflow))
[:a, :b]

This analysis answers the question: "Which tasks are reachable from any start node?"

simulate(workflow)

@spec simulate(Choreo.Workflow.t()) :: %{optional(Yog.node_id()) => map()}

Simulates execution and returns estimated total latency per node.

Assumes sequential execution along the critical path. Parallel paths are counted by their longest branch.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b, timeout_ms: 5000)
...>   |> Choreo.Workflow.add_task(:c, timeout_ms: 3000, retry: 2, retry_backoff_ms: 100)
...>   |> Choreo.Workflow.add_end(:d)
...>   |> Choreo.Workflow.connect(:a, :b)
...>   |> Choreo.Workflow.connect(:b, :c)
...>   |> Choreo.Workflow.connect(:c, :d)
iex> result = Choreo.Workflow.Analysis.simulate(workflow)
iex> result[:b].task_latency
5000
iex> result[:c].task_latency
3000
iex> result[:c].retry_latency
200

This analysis answers the question: "What is the estimated latency for each task?"

uncompensated_paths(flow)

@spec uncompensated_paths(Choreo.Workflow.t()) :: [Yog.node_id()]

Returns tasks that can fail but have no valid compensation path.

A task "can fail" if it has an outgoing :error edge. A valid compensation path is an unbroken chain of :compensation edges leading to a :start or :end node.

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:start)
...>   |> Choreo.Workflow.add_task(:process_payment)
...>   |> Choreo.Workflow.add_compensation(:rollback_payment, for: :process_payment)
...>   |> Choreo.Workflow.add_task(:dead_end_comp)
...>   |> Choreo.Workflow.add_end(:done)
...>   |> Choreo.Workflow.connect(:start, :process_payment)
...>   |> Choreo.Workflow.connect(:process_payment, :done)
...>   |> Choreo.Workflow.connect(:process_payment, :rollback_payment, edge_type: :error)
...>   |> Choreo.Workflow.connect(:rollback_payment, :dead_end_comp, edge_type: :compensation)
iex> Choreo.Workflow.Analysis.uncompensated_paths(workflow)
[:process_payment]

This analysis answers the question: "Which tasks can fail without a valid compensation path?"

validate(workflow)

@spec validate(Choreo.Workflow.t()) :: [{:error | :warning, String.t()}]

Validates a workflow and returns a list of issues.

Checks for:

  • missing start / end nodes
  • cycles
  • orphan tasks
  • dead-end tasks
  • tasks with retries but no compensations
  • unreachable compensation nodes

Examples

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_start(:a)
...>   |> Choreo.Workflow.add_task(:b)
...>   |> Choreo.Workflow.add_end(:c)
...>   |> Choreo.Workflow.connect(:a, :b)
...>   |> Choreo.Workflow.connect(:b, :c)
iex> Choreo.Workflow.Analysis.validate(workflow)
[]

iex> workflow = Choreo.Workflow.new()
iex> workflow = workflow
...>   |> Choreo.Workflow.add_task(:a)
...>   |> Choreo.Workflow.add_end(:b)
...>   |> Choreo.Workflow.connect(:a, :b)
iex> issues = Choreo.Workflow.Analysis.validate(workflow)
iex> {:error, "No start nodes"} in issues
true

This analysis answers the question: "Is the workflow structurally sound?"