Choreo.Dataflow.Analysis (Choreo v0.9.0)

Copy Markdown View Source

Analysis functions for Choreo.Dataflow pipelines.

Provides algorithms that answer practical questions about a dataflow:

  • Is there a cycle? (feedback loop detection)
  • What is the execution order? (topological sort)
  • Which stages have no upstream source? (orphans)
  • Which stages never reach a sink? (dead ends)
  • Where are the bottlenecks? (high fan-in / fan-out)
  • What is the critical path? (longest source→sink chain)
  • Where does back-pressure build up? (throughput simulation)
  • What is the data lineage for a stage/sink? (upstream_lineage / upstream_sources)
  • What is the downstream impact of a stage/source? (downstream_impact / downstream_sinks)

Further reading

Summary

Functions

Returns nodes where simulated input rate exceeds a threshold.

Identifies nodes where the simulated incoming data rate exceeds capacity.

Checks whether the dataflow contains a directed cycle.

Returns nodes that cannot reach any sink.

Returns all descendants (downstream stages) reachable from the given source node.

Returns only the sink nodes that are transitively fed by the given source node.

Returns edges filtered by path type.

Returns nodes with high combined fan-in and fan-out (structural hubs).

Generates a heatmap of the dataflow based on throughput (inbound rate).

Finds the longest weighted path from any source to any sink.

Returns nodes that are not reachable from any source.

Simulates throughput propagation through the pipeline.

Returns all sink node IDs in the dataflow.

Returns all source node IDs in the dataflow.

Returns a topological ordering of all stages.

Identifies :transform nodes that lack explicit error handling paths.

Identifies :sink nodes that lack explicit dead-letter paths.

Returns all ancestors (upstream stages) of the given target node.

Returns only the source nodes that transitively feed into the given target node.

Validates a dataflow pipeline and returns a list of issues.

Functions

backpressure_points(flow, opts \\ [])

@spec backpressure_points(
  Choreo.Dataflow.t(),
  keyword()
) :: [Yog.node_id()]

Returns nodes where simulated input rate exceeds a threshold.

In practice these are the stages that will experience back-pressure first if they cannot process fast enough.

Options

  • :threshold — minimum in_rate to be considered a backpressure point (default: 0, meaning any node with inbound flow)

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a, rate: 100)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Enum.sort(Choreo.Dataflow.Analysis.backpressure_points(flow))
[:b, :c]

This analysis answers the question: "Where will back-pressure build up?"

bottlenecks(flow)

@spec bottlenecks(Choreo.Dataflow.t()) :: [Yog.node_id()]

Alias for capacity_bottlenecks/1.

Returns nodes where simulated throughput exceeds capacity.

capacity_bottlenecks(flow)

@spec capacity_bottlenecks(Choreo.Dataflow.t()) :: [Yog.node_id()]

Identifies nodes where the simulated incoming data rate exceeds capacity.

Reuses the simulate/2 logic to calculate steady-state incoming rates, and compares them against the :capacity attribute of each node.

Returns a list of node IDs where in_rate > capacity.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a, rate: 100)
...>   |> Choreo.Dataflow.add_transform(:b, capacity: 50)
...>   |> Choreo.Dataflow.add_transform(:c, capacity: 150)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
...>   |> Choreo.Dataflow.connect(:c, :d)
iex> Choreo.Dataflow.Analysis.capacity_bottlenecks(flow)
[:b]

This analysis answers the question: "Which stages will be overwhelmed by incoming data?"

cyclic?(dataflow)

@spec cyclic?(Choreo.Dataflow.t()) :: boolean()

Checks whether the dataflow contains a directed cycle.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
...>   |> Choreo.Dataflow.connect(:c, :b)
iex> Choreo.Dataflow.Analysis.cyclic?(flow)
true

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.cyclic?(flow)
false

This analysis answers the question: "Is there a feedback loop in the data pipeline?"

dead_ends(flow)

@spec dead_ends(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns nodes that cannot reach any sink.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.add_transform(:d)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.dead_ends(flow)
[:d]

This analysis answers the question: "Which stages can never reach a sink?"

downstream_impact(flow, source)

@spec downstream_impact(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]

Returns all descendants (downstream stages) reachable from the given source node.

Examples

iex> flow = Choreo.Dataflow.new()
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.downstream_impact(flow, :a)
[:a, :b, :c]

downstream_sinks(flow, source)

@spec downstream_sinks(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]

Returns only the sink nodes that are transitively fed by the given source node.

Examples

iex> flow = Choreo.Dataflow.new()
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:k1)
...>   |> Choreo.Dataflow.add_sink(:k2)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :k1)
iex> Choreo.Dataflow.Analysis.downstream_sinks(flow, :a)
[:k1]

edges_of_type(dataflow, path_type)

@spec edges_of_type(Choreo.Dataflow.t(), atom()) :: [
  {Yog.node_id(), Yog.node_id(), String.t()}
]

Returns edges filtered by path type.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.add_error_path(:a, :c)
iex> Choreo.Dataflow.Analysis.edges_of_type(flow, :normal)
[{:a, :b, 1}]
iex> Choreo.Dataflow.Analysis.edges_of_type(flow, :error)
[{:a, :c, 1}]

This analysis answers the question: "Which edges have a specific path type?"

fan_hubs(dataflow, opts \\ [])

@spec fan_hubs(
  Choreo.Dataflow.t(),
  keyword()
) :: [Yog.node_id()]

Returns nodes with high combined fan-in and fan-out (structural hubs).

Options

  • :threshold — minimum in_degree + out_degree to qualify (default: 3)

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_source(:b)
...>   |> Choreo.Dataflow.add_transform(:hub)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :hub)
...>   |> Choreo.Dataflow.connect(:b, :hub)
...>   |> Choreo.Dataflow.connect(:hub, :c)
...>   |> Choreo.Dataflow.connect(:hub, :d)
iex> Choreo.Dataflow.Analysis.fan_hubs(flow)
[:hub]

This analysis answers the question: "Which stages have the highest fan-in and fan-out?"

heatmap(flow, opts \\ [])

@spec heatmap(
  Choreo.Dataflow.t(),
  keyword()
) :: Choreo.Dataflow.t()

Generates a heatmap of the dataflow based on throughput (inbound rate).

Nodes with higher incoming data rates will be colored with "hotter" colors.

Options

  • :palette — Color palette (:heat, :cool, :spectral)

longest_path(flow)

@spec longest_path(Choreo.Dataflow.t()) :: {:ok, [Yog.node_id()], number()} | :error

Finds the longest weighted path from any source to any sink.

This is the critical path for latency: the chain of stages that determines the minimum end-to-end latency of the pipeline.

Returns {:ok, [id], total_weight} or :error if the graph is cyclic or has no source→sink path.

Edge weights default to 1 unless overridden with connect/4 option :weight. You can also encode per-node latency by setting :weight on outgoing edges.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :b, weight: 10)
...>   |> Choreo.Dataflow.connect(:b, :c, weight: 5)
...>   |> Choreo.Dataflow.connect(:c, :d, weight: 2)
iex> Choreo.Dataflow.Analysis.longest_path(flow)
{:ok, [:a, :b, :c, :d], 17}

This analysis answers the question: "What is the critical path that determines end-to-end latency?"

orphan_nodes(flow)

@spec orphan_nodes(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns nodes that are not reachable from any source.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
iex> Choreo.Dataflow.Analysis.orphan_nodes(flow)
[:c]

This analysis answers the question: "Which stages are unreachable from any source?"

simulate(dataflow)

@spec simulate(Choreo.Dataflow.t()) :: %{
  optional(Yog.node_id()) => %{
    in_rate: number(),
    out_rate: number(),
    latency_ms: number()
  }
}

Simulates throughput propagation through the pipeline.

Each source is assigned a :rate (events/sec). Each non-source stage receives the sum of all incoming rates. The result is a map of node_id => %{in_rate: float, out_rate: float, latency_ms: number}.

Sources use their own :rate; transforms/buffers/merges sum inputs; sinks consume without producing.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a, rate: 100)
...>   |> Choreo.Dataflow.add_source(:b, rate: 200)
...>   |> Choreo.Dataflow.add_merge(:m)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :m)
...>   |> Choreo.Dataflow.connect(:b, :m)
...>   |> Choreo.Dataflow.connect(:m, :c)
iex> result = Choreo.Dataflow.Analysis.simulate(flow)
iex> result[:a].out_rate
100
iex> result[:b].out_rate
200
iex> result[:m].in_rate
300
iex> result[:c].in_rate
300
iex> result[:c].out_rate
0

This analysis answers the question: "What is the throughput at each stage?"

sinks(flow)

@spec sinks(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns all sink node IDs in the dataflow.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
iex> Choreo.Dataflow.Analysis.sinks(flow)
[:b]

This analysis answers the question: "Where does data leave the pipeline?"

sources(flow)

@spec sources(Choreo.Dataflow.t()) :: [Yog.node_id()]

Returns all source node IDs in the dataflow.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_source(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
iex> Enum.sort(Choreo.Dataflow.Analysis.sources(flow))
[:a, :b]

This analysis answers the question: "Where does data enter the pipeline?"

topological_sort(dataflow)

@spec topological_sort(Choreo.Dataflow.t()) ::
  {:ok, [Yog.node_id()]} | {:error, :contains_cycle}

Returns a topological ordering of all stages.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> {:ok, order} = Choreo.Dataflow.Analysis.topological_sort(flow)
iex> Enum.find_index(order, & &1 == :a) < Enum.find_index(order, & &1 == :b)
true
iex> Enum.find_index(order, & &1 == :b) < Enum.find_index(order, & &1 == :c)
true

This analysis answers the question: "In what order should stages execute?"

unhandled_errors(flow)

@spec unhandled_errors(Choreo.Dataflow.t()) :: [Yog.node_id()]

Identifies :transform nodes that lack explicit error handling paths.

Returns a list of transform node IDs that do not have any outgoing edge configured with path_type: :error or path_type: :dead_letter.

Sinks are checked separately via unhandled_sinks/1.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_transform(:c)
...>   |> Choreo.Dataflow.add_sink(:d)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
...>   |> Choreo.Dataflow.connect(:c, :d)
...>   |> Choreo.Dataflow.add_error_path(:b, :d)
iex> Enum.sort(Choreo.Dataflow.Analysis.unhandled_errors(flow))
[:c]

This analysis answers the question: "Which transforms lack explicit error handling?"

unhandled_sinks(flow)

@spec unhandled_sinks(Choreo.Dataflow.t()) :: [Yog.node_id()]

Identifies :sink nodes that lack explicit dead-letter paths.

Returns a list of sink node IDs that do not have any outgoing edge configured with path_type: :dead_letter.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
iex> Enum.sort(Choreo.Dataflow.Analysis.unhandled_sinks(flow))
[:b, :c]

upstream_lineage(flow, target)

@spec upstream_lineage(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]

Returns all ancestors (upstream stages) of the given target node.

Examples

iex> flow = Choreo.Dataflow.new()
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.upstream_lineage(flow, :c)
[:a, :b, :c]

upstream_sources(flow, target)

@spec upstream_sources(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]

Returns only the source nodes that transitively feed into the given target node.

Examples

iex> flow = Choreo.Dataflow.new()
...>   |> Choreo.Dataflow.add_source(:s1)
...>   |> Choreo.Dataflow.add_source(:s2)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:s1, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.upstream_sources(flow, :c)
[:s1]

validate(flow)

@spec validate(Choreo.Dataflow.t()) :: [{:error | :warning, String.t()}]

Validates a dataflow pipeline and returns a list of issues.

Checks for cycles, orphan nodes, dead ends, missing sources, and missing sinks.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
...>   |> Choreo.Dataflow.connect(:a, :b)
...>   |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.validate(flow)
[]

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.connect(:a, :b)
iex> issues = Choreo.Dataflow.Analysis.validate(flow)
iex> {:error, "No source nodes"} in issues
true
iex> {:error, "No sink nodes"} in issues
true

This analysis answers the question: "Is the pipeline structurally sound?"