Analysis functions for Choreo.Dataflow pipelines.
Provides algorithms that answer practical questions about a dataflow:
- Is there a cycle? (feedback loop detection)
- What is the execution order? (topological sort)
- Which stages have no upstream source? (orphans)
- Which stages never reach a sink? (dead ends)
- Where are the bottlenecks? (high fan-in / fan-out)
- What is the critical path? (longest source→sink chain)
- Where does back-pressure build up? (throughput simulation)
- What is the data lineage for a stage/sink? (upstream_lineage / upstream_sources)
- What is the downstream impact of a stage/source? (downstream_impact / downstream_sinks)
Further reading
Summary
Functions
Returns nodes where simulated input rate exceeds a threshold.
Returns nodes with high combined fan-in and fan-out.
Identifies nodes where the simulated incoming data rate exceeds capacity.
Checks whether the dataflow contains a directed cycle.
Returns nodes that cannot reach any sink.
Returns all descendants (downstream stages) reachable from the given source node.
Returns only the sink nodes that are transitively fed by the given source node.
Returns edges filtered by path type.
Generates a heatmap of the dataflow based on throughput (inbound rate).
Finds the longest weighted path from any source to any sink.
Returns nodes that are not reachable from any source.
Simulates throughput propagation through the pipeline.
Returns all sink node IDs in the dataflow.
Returns all source node IDs in the dataflow.
Returns a topological ordering of all stages.
Identifies nodes that lack explicit error handling paths.
Returns all ancestors (upstream stages) of the given target node.
Returns only the source nodes that transitively feed into the given target node.
Validates a dataflow pipeline and returns a list of issues.
Functions
@spec backpressure_points( Choreo.Dataflow.t(), keyword() ) :: [Yog.node_id()]
Returns nodes where simulated input rate exceeds a threshold.
In practice these are the stages that will experience back-pressure first if they cannot process fast enough.
Options
:threshold— minimum in_rate to be considered a backpressure point (default:0, meaning any node with inbound flow)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a, rate: 100)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> Enum.sort(Choreo.Dataflow.Analysis.backpressure_points(flow))
[:b, :c]This analysis answers the question: "Where will back-pressure build up?"
@spec bottlenecks( Choreo.Dataflow.t(), keyword() ) :: [Yog.node_id()]
Returns nodes with high combined fan-in and fan-out.
Options
:threshold— minimumin_degree + out_degreeto qualify (default:3)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_source(:b)
...> |> Choreo.Dataflow.add_transform(:hub)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.add_sink(:d)
...> |> Choreo.Dataflow.connect(:a, :hub)
...> |> Choreo.Dataflow.connect(:b, :hub)
...> |> Choreo.Dataflow.connect(:hub, :c)
...> |> Choreo.Dataflow.connect(:hub, :d)
iex> Choreo.Dataflow.Analysis.bottlenecks(flow)
[:hub]This analysis answers the question: "Which stages have the highest fan-in and fan-out?"
@spec capacity_bottlenecks(Choreo.Dataflow.t()) :: [Yog.node_id()]
Identifies nodes where the simulated incoming data rate exceeds capacity.
Reuses the simulate/2 logic to calculate steady-state incoming rates,
and compares them against the :capacity attribute of each node.
Returns a list of node IDs where in_rate > capacity.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a, rate: 100)
...> |> Choreo.Dataflow.add_transform(:b, capacity: 50)
...> |> Choreo.Dataflow.add_transform(:c, capacity: 150)
...> |> Choreo.Dataflow.add_sink(:d)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
...> |> Choreo.Dataflow.connect(:c, :d)
iex> Choreo.Dataflow.Analysis.capacity_bottlenecks(flow)
[:b]This analysis answers the question: "Which stages will be overwhelmed by incoming data?"
@spec cyclic?(Choreo.Dataflow.t()) :: boolean()
Checks whether the dataflow contains a directed cycle.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_transform(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
...> |> Choreo.Dataflow.connect(:c, :b)
iex> Choreo.Dataflow.Analysis.cyclic?(flow)
true
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.cyclic?(flow)
falseThis analysis answers the question: "Is there a feedback loop in the data pipeline?"
@spec dead_ends(Choreo.Dataflow.t()) :: [Yog.node_id()]
Returns nodes that cannot reach any sink.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.add_transform(:d)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.dead_ends(flow)
[:d]This analysis answers the question: "Which stages can never reach a sink?"
@spec downstream_impact(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]
Returns all descendants (downstream stages) reachable from the given source node.
Examples
iex> flow = Choreo.Dataflow.new()
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.downstream_impact(flow, :a)
[:a, :b, :c]
@spec downstream_sinks(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]
Returns only the sink nodes that are transitively fed by the given source node.
Examples
iex> flow = Choreo.Dataflow.new()
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:k1)
...> |> Choreo.Dataflow.add_sink(:k2)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :k1)
iex> Choreo.Dataflow.Analysis.downstream_sinks(flow, :a)
[:k1]
@spec edges_of_type(Choreo.Dataflow.t(), atom()) :: [ {Yog.node_id(), Yog.node_id(), String.t()} ]
Returns edges filtered by path type.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.add_error_path(:a, :c)
iex> Choreo.Dataflow.Analysis.edges_of_type(flow, :normal)
[{:a, :b, 1}]
iex> Choreo.Dataflow.Analysis.edges_of_type(flow, :error)
[{:a, :c, 1}]This analysis answers the question: "Which edges have a specific path type?"
@spec heatmap( Choreo.Dataflow.t(), keyword() ) :: Choreo.Dataflow.t()
Generates a heatmap of the dataflow based on throughput (inbound rate).
Nodes with higher incoming data rates will be colored with "hotter" colors.
Options
:palette— Color palette (:heat,:cool,:spectral)
@spec longest_path(Choreo.Dataflow.t()) :: {:ok, [Yog.node_id()], number()} | :error
Finds the longest weighted path from any source to any sink.
This is the critical path for latency: the chain of stages that determines the minimum end-to-end latency of the pipeline.
Returns {:ok, [id], total_weight} or :error if the graph is cyclic
or has no source→sink path.
Edge weights default to 1 unless overridden with connect/4 option
:weight. You can also encode per-node latency by setting :weight
on outgoing edges.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_transform(:c)
...> |> Choreo.Dataflow.add_sink(:d)
...> |> Choreo.Dataflow.connect(:a, :b, weight: 10)
...> |> Choreo.Dataflow.connect(:b, :c, weight: 5)
...> |> Choreo.Dataflow.connect(:c, :d, weight: 2)
iex> Choreo.Dataflow.Analysis.longest_path(flow)
{:ok, [:a, :b, :c, :d], 17}This analysis answers the question: "What is the critical path that determines end-to-end latency?"
@spec orphan_nodes(Choreo.Dataflow.t()) :: [Yog.node_id()]
Returns nodes that are not reachable from any source.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_transform(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
iex> Choreo.Dataflow.Analysis.orphan_nodes(flow)
[:c]This analysis answers the question: "Which stages are unreachable from any source?"
@spec simulate(Choreo.Dataflow.t()) :: %{ optional(Yog.node_id()) => %{ in_rate: number(), out_rate: number(), latency_ms: number() } }
Simulates throughput propagation through the pipeline.
Each source is assigned a :rate (events/sec). Each non-source stage
receives the sum of all incoming rates. The result is a map of
node_id => %{in_rate: float, out_rate: float, latency_ms: number}.
Sources use their own :rate; transforms/buffers/merges sum inputs;
sinks consume without producing.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a, rate: 100)
...> |> Choreo.Dataflow.add_source(:b, rate: 200)
...> |> Choreo.Dataflow.add_merge(:m)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :m)
...> |> Choreo.Dataflow.connect(:b, :m)
...> |> Choreo.Dataflow.connect(:m, :c)
iex> result = Choreo.Dataflow.Analysis.simulate(flow)
iex> result[:a].out_rate
100
iex> result[:b].out_rate
200
iex> result[:m].in_rate
300
iex> result[:c].in_rate
300
iex> result[:c].out_rate
0This analysis answers the question: "What is the throughput at each stage?"
@spec sinks(Choreo.Dataflow.t()) :: [Yog.node_id()]
Returns all sink node IDs in the dataflow.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
iex> Choreo.Dataflow.Analysis.sinks(flow)
[:b]This analysis answers the question: "Where does data leave the pipeline?"
@spec sources(Choreo.Dataflow.t()) :: [Yog.node_id()]
Returns all source node IDs in the dataflow.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_source(:b)
...> |> Choreo.Dataflow.add_transform(:c)
iex> Enum.sort(Choreo.Dataflow.Analysis.sources(flow))
[:a, :b]This analysis answers the question: "Where does data enter the pipeline?"
@spec topological_sort(Choreo.Dataflow.t()) :: {:ok, [Yog.node_id()]} | {:error, :contains_cycle}
Returns a topological ordering of all stages.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> {:ok, order} = Choreo.Dataflow.Analysis.topological_sort(flow)
iex> Enum.find_index(order, & &1 == :a) < Enum.find_index(order, & &1 == :b)
true
iex> Enum.find_index(order, & &1 == :b) < Enum.find_index(order, & &1 == :c)
trueThis analysis answers the question: "In what order should stages execute?"
@spec unhandled_errors(Choreo.Dataflow.t()) :: [Yog.node_id()]
Identifies nodes that lack explicit error handling paths.
Checks all :transform and :sink nodes. Returns a list of node IDs that do
not have any outgoing edge configured with path_type: :error or
path_type: :dead_letter.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_transform(:c)
...> |> Choreo.Dataflow.add_sink(:d)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
...> |> Choreo.Dataflow.connect(:c, :d)
...> |> Choreo.Dataflow.add_error_path(:b, :d)
iex> Enum.sort(Choreo.Dataflow.Analysis.unhandled_errors(flow))
[:c, :d]This analysis answers the question: "Which stages lack explicit error handling?"
@spec upstream_lineage(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]
Returns all ancestors (upstream stages) of the given target node.
Examples
iex> flow = Choreo.Dataflow.new()
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.upstream_lineage(flow, :c)
[:a, :b, :c]
@spec upstream_sources(Choreo.Dataflow.t(), Yog.node_id()) :: [Yog.node_id()]
Returns only the source nodes that transitively feed into the given target node.
Examples
iex> flow = Choreo.Dataflow.new()
...> |> Choreo.Dataflow.add_source(:s1)
...> |> Choreo.Dataflow.add_source(:s2)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:s1, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.upstream_sources(flow, :c)
[:s1]
@spec validate(Choreo.Dataflow.t()) :: [{:error | :warning, String.t()}]
Validates a dataflow pipeline and returns a list of issues.
Checks for cycles, orphan nodes, dead ends, missing sources, and missing sinks.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.add_sink(:c)
...> |> Choreo.Dataflow.connect(:a, :b)
...> |> Choreo.Dataflow.connect(:b, :c)
iex> Choreo.Dataflow.Analysis.validate(flow)
[]
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.connect(:a, :b)
iex> issues = Choreo.Dataflow.Analysis.validate(flow)
iex> {:error, "No source nodes"} in issues
true
iex> {:error, "No sink nodes"} in issues
trueThis analysis answers the question: "Is the pipeline structurally sound?"