Choreo.Dataflow.Analysis (Choreo v0.7.0)

Copy Markdown View Source

Analysis functions for Choreo.Dataflow pipelines.

Provides algorithms that answer practical questions about a dataflow:

  • Is there a cycle? (feedback loop detection)
  • What is the execution order? (topological sort)
  • Which stages have no upstream source? (orphans)
  • Which stages never reach a sink? (dead ends)
  • Where are the bottlenecks? (high fan-in / fan-out)
  • What is the critical path? (longest source→sink chain)
  • Where does back-pressure build up? (throughput simulation)

Further reading

Summary

Functions

Returns nodes where simulated input rate exceeds a threshold.

Returns nodes with high combined fan-in and fan-out.

Identifies nodes where the simulated incoming data rate exceeds capacity.

Checks whether the dataflow contains a directed cycle.

Returns nodes that cannot reach any sink.

Returns edges filtered by path type.

Finds the longest weighted path from any source to any sink.

Returns nodes that are not reachable from any source.

Simulates throughput propagation through the pipeline.

Returns all sink node IDs in the dataflow.

Returns all source node IDs in the dataflow.

Returns a topological ordering of all stages.

Identifies nodes that lack explicit error handling paths.

Validates a dataflow pipeline and returns a list of issues.

Functions

backpressure_points(flow, opts \\ [])

@spec backpressure_points(
  Choreo.Dataflow.t(),
  keyword()
) :: [Yog.node_id()]

Returns nodes where simulated input rate exceeds a threshold.

In practice these are the stages that will experience back-pressure first if they cannot process fast enough.

Options

  • :threshold — minimum in_rate to be considered a backpressure point (default: 0, meaning any node with inbound flow)

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a, rate: 100)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Enum.sort(Choreo.Dataflow.Analysis.backpressure_points(flow))
[:b, :c]

This analysis answers the question: "Where will back-pressure build up?"

bottlenecks(dataflow, opts \\ [])

@spec bottlenecks(
  Choreo.Dataflow.t(),
  keyword()
) :: [Yog.node_id()]

Returns nodes with high combined fan-in and fan-out.

Options

  • :threshold — minimum in_degree + out_degree to qualify (default: 3)

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_source(:b)
...>   |> Choreo.Dataflow.add_transform(:hub)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :hub)
...>   |> Choreo.Dataflow.connect(:b, :hub)
...>   |> Choreo.Dataflow.connect(:hub, :c)
...>   |> Choreo.Dataflow.connect(:hub, :d)
iex> Choreo.Dataflow.Analysis.bottlenecks(flow)
[:hub]

This analysis answers the question: "Which stages have the highest fan-in and fan-out?"

capacity_bottlenecks(flow)

@spec capacity_bottlenecks(Choreo.Dataflow.t()) :: [Yog.node_id()]

Identifies nodes where the simulated incoming data rate exceeds capacity.

Reuses the simulate/2 logic to calculate steady-state incoming rates, and compares them against the :capacity attribute of each node.

Returns a list of node IDs where in_rate > capacity.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a, rate: 100)
...>   |> Choreo.Dataflow.add_transform(:b, capacity: 50)
...>   |> Choreo.Dataflow.add_transform(:c, capacity: 150)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
...>   |> Choreo.Dataflow.connect(:c, :d)
iex> Choreo.Dataflow.Analysis.capacity_bottlenecks(flow)
[:b]

This analysis answers the question: "Which stages will be overwhelmed by incoming data?"

cyclic?(dataflow)

@spec cyclic?(Choreo.Dataflow.t()) :: boolean()

Checks whether the dataflow contains a directed cycle.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
...>   |> Choreo.Dataflow.connect(:c, :b)
iex> Choreo.Dataflow.Analysis.cyclic?(flow)
true

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.cyclic?(flow)
false

This analysis answers the question: "Is there a feedback loop in the data pipeline?"

dead_ends(flow)

@spec dead_ends(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns nodes that cannot reach any sink.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.add_transform(:d)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.dead_ends(flow)
[:d]

This analysis answers the question: "Which stages can never reach a sink?"

edges_of_type(dataflow, path_type)

@spec edges_of_type(Choreo.Dataflow.t(), atom()) :: [
  {Yog.node_id(), Yog.node_id(), String.t()}
]

Returns edges filtered by path type.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.add_error_path(:a, :c)
iex> Choreo.Dataflow.Analysis.edges_of_type(flow, :normal)
[{:a, :b, 1}]
iex> Choreo.Dataflow.Analysis.edges_of_type(flow, :error)
[{:a, :c, 1}]

This analysis answers the question: "Which edges have a specific path type?"

longest_path(flow)

@spec longest_path(Choreo.Dataflow.t()) :: {:ok, [Yog.node_id()], number()} | :error

Finds the longest weighted path from any source to any sink.

This is the critical path for latency: the chain of stages that determines the minimum end-to-end latency of the pipeline.

Returns {:ok, [id], total_weight} or :error if the graph is cyclic or has no source→sink path.

Edge weights default to 1 unless overridden with connect/4 option :weight. You can also encode per-node latency by setting :weight on outgoing edges.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :b, weight: 10)
...>   |> Choreo.Dataflow.connect(:b, :c, weight: 5)
...>   |> Choreo.Dataflow.connect(:c, :d, weight: 2)
iex> Choreo.Dataflow.Analysis.longest_path(flow)
{:ok, [:a, :b, :c, :d], 17}

This analysis answers the question: "What is the critical path that determines end-to-end latency?"

orphan_nodes(flow)

@spec orphan_nodes(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns nodes that are not reachable from any source.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
iex> Choreo.Dataflow.Analysis.orphan_nodes(flow)
[:c]

This analysis answers the question: "Which stages are unreachable from any source?"

simulate(flow)

@spec simulate(Choreo.Dataflow.t()) :: %{
  optional(Yog.node_id()) => %{
    in_rate: number(),
    out_rate: number(),
    latency_ms: number()
  }
}

Simulates throughput propagation through the pipeline.

Each source is assigned a :rate (events/sec). Each non-source stage receives the sum of all incoming rates. The result is a map of node_id => %{in_rate: float, out_rate: float, latency_ms: number}.

Sources use their own :rate; transforms/buffers/merges sum inputs; sinks consume without producing.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a, rate: 100)
...>   |> Choreo.Dataflow.add_source(:b, rate: 200)
...>   |> Choreo.Dataflow.add_merge(:m)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :m)
...>   |> Choreo.Dataflow.connect(:b, :m)
...>   |> Choreo.Dataflow.connect(:m, :c)
iex> result = Choreo.Dataflow.Analysis.simulate(flow)
iex> result[:a].out_rate
100
iex> result[:b].out_rate
200
iex> result[:m].in_rate
300
iex> result[:c].in_rate
300
iex> result[:c].out_rate
0

This analysis answers the question: "What is the throughput at each stage?"

sinks(flow)

@spec sinks(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns all sink node IDs in the dataflow.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
iex> Choreo.Dataflow.Analysis.sinks(flow)
[:b]

This analysis answers the question: "Where does data leave the pipeline?"

sources(flow)

@spec sources(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns all source node IDs in the dataflow.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_source(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
iex> Enum.sort(Choreo.Dataflow.Analysis.sources(flow))
[:a, :b]

This analysis answers the question: "Where does data enter the pipeline?"

topological_sort(dataflow)

@spec topological_sort(Choreo.Dataflow.t()) ::
  {:ok, [Yog.node_id()]} | {:error, :contains_cycle}

Returns a topological ordering of all stages.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> {:ok, order} = Choreo.Dataflow.Analysis.topological_sort(flow)
iex> Enum.find_index(order, & &1 == :a) < Enum.find_index(order, & &1 == :b)
true
iex> Enum.find_index(order, & &1 == :b) < Enum.find_index(order, & &1 == :c)
true

This analysis answers the question: "In what order should stages execute?"

unhandled_errors(flow)

@spec unhandled_errors(Choreo.Dataflow.t()) :: [Yog.node_id()]

Identifies nodes that lack explicit error handling paths.

Checks all :transform and :sink nodes. Returns a list of node IDs that do not have any outgoing edge configured with path_type: :error or path_type: :dead_letter.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
...>   |> Choreo.Dataflow.connect(:c, :d)
...>   |> Choreo.Dataflow.add_error_path(:b, :d)
iex> Enum.sort(Choreo.Dataflow.Analysis.unhandled_errors(flow))
[:c, :d]

This analysis answers the question: "Which stages lack explicit error handling?"

validate(flow)

@spec validate(Choreo.Dataflow.t()) :: [{:error | :warning, String.t()}]

Validates a dataflow pipeline and returns a list of issues.

Checks for cycles, orphan nodes, dead ends, missing sources, and missing sinks.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.validate(flow)
[]

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.connect(:a, :b)
iex> issues = Choreo.Dataflow.Analysis.validate(flow)
iex> {:error, "No source nodes"} in issues
true
iex> {:error, "No sink nodes"} in issues
true

This analysis answers the question: "Is the pipeline structurally sound?"