Choreo.Dataflow (Choreo v0.6.0)

Copy Markdown View Source

Dataflow / pipeline diagram builder on top of Yog.

Choreo.Dataflow models stream-processing and ETL pipelines where nodes are processing stages and edges are data streams. It is a natural complement to Choreo architecture diagrams: those show what infrastructure exists; dataflow shows how data moves through it.

When to use

Use Choreo.Dataflow when designing, reviewing, or debugging pipelines — event-driven systems, ETL jobs, stream processors, or microservice data flows. It helps identify bottlenecks, orphan stages, and critical paths before they hit production.

Node types

  • :source — entry point that produces data
  • :sink — terminal consumer that persists or emits data
  • :transform — stateless operation (map, filter, aggregate)
  • :buffer — queue, topic, or back-pressure buffer
  • :conditional — branch / split based on a predicate
  • :merge — join multiple streams into one

Further reading

Quick Start

pipeline =
  Choreo.Dataflow.new()
  |> Choreo.Dataflow.add_source(:sensor, label: "IoT Sensor")
  |> Choreo.Dataflow.add_transform(:parse, label: "JSON Parser")
  |> Choreo.Dataflow.add_buffer(:kafka, label: "Kafka Topic")
  |> Choreo.Dataflow.add_transform(:aggregate, label: "Window Agg")
  |> Choreo.Dataflow.add_sink(:db, label: "TimescaleDB")
  |> Choreo.Dataflow.connect(:sensor, :parse, data_type: "raw bytes")
  |> Choreo.Dataflow.connect(:parse, :kafka, data_type: "event")
  |> Choreo.Dataflow.connect(:kafka, :aggregate, data_type: "event")
  |> Choreo.Dataflow.connect(:aggregate, :db, data_type: "metrics")

dot = Choreo.Dataflow.to_dot(pipeline)

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; parse [label="JSON Parser", fillcolor="#3b82f6", shape="box3d"]; aggregate [label="Window Agg", fillcolor="#3b82f6", shape="box3d"]; sensor [label="IoT Sensor", fillcolor="#10b981", shape="house"]; kafka [label="Kafka Topic", fillcolor="#f59e0b", shape="cylinder"]; db [label="TimescaleDB", fillcolor="#f43f5e", shape="invhouse"]; parse -> kafka [style="solid", penwidth="1.0", color="#64748b", label="event"]; aggregate -> db [style="solid", penwidth="1.0", color="#64748b", label="metrics"]; sensor -> parse [style="solid", penwidth="1.0", color="#64748b", label="raw bytes"]; kafka -> aggregate [style="solid", penwidth="1.0", color="#64748b", label="event"]; }

Analysis

# Detect feedback loops (usually a bug in dataflow)
Choreo.Dataflow.Analysis.cyclic?(pipeline)

# Execution order for a DAG
{:ok, order} = Choreo.Dataflow.Analysis.topological_sort(pipeline)

# Find orphan stages with no upstream source
Choreo.Dataflow.Analysis.orphan_nodes(pipeline)

# Find dead-end stages that never reach a sink
Choreo.Dataflow.Analysis.dead_ends(pipeline)

# Longest latency path (critical path)
{:ok, path, latency} = Choreo.Dataflow.Analysis.longest_path(pipeline)

# Throughput simulation
Choreo.Dataflow.Analysis.simulate(pipeline)

Summary

Functions

Adds a buffer node (queue, topic, back-pressure reservoir).

Defines a cluster (subgraph) for grouping nodes visually.

Adds a conditional / split node (branching).

Adds a dead-letter-path edge between two stages.

Adds an error-path edge between two stages.

Adds a merge / join node (combining multiple streams).

Adds a retry-path edge between two stages.

Adds a sink node (data consumer / terminal).

Adds a source node (data producer / entry point).

Adds a transform node (stateless processing).

Connects two stages with a data-flow edge.

Returns all edges as {from, to, weight} tuples.

Creates a new empty dataflow graph.

Returns all node IDs in the dataflow.

Returns all nodes of a given type.

Renders the dataflow to DOT format.

Returns the raw Yog.Graph struct underpinning the dataflow.

Types

t()

@type t() :: %Choreo.Dataflow{
  clusters: %{required(String.t()) => map()},
  edge_meta: %{optional({Yog.node_id(), Yog.node_id()}) => map()},
  graph: Yog.graph()
}

Functions

add_buffer(flow, id, opts \\ [])

@spec add_buffer(t(), Yog.node_id(), keyword()) :: t()

Adds a buffer node (queue, topic, back-pressure reservoir).

Options

  • :label — display label (defaults to the node id)
  • :description — tooltip text
  • :capacity — annotated capacity (visual only)
  • :latency_ms — buffering latency in milliseconds (used by simulation)
  • :cluster — cluster name for grouping

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_buffer(flow, :kafka, label: "Kafka Topic", capacity: 1000)
iex> Choreo.Dataflow.nodes(flow)
[:kafka]
iex> Yog.node(flow.graph, :kafka).node_type
:buffer
iex> Yog.node(flow.graph, :kafka).capacity
1000

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; kafka [label="Kafka Topic (cap: 1000)", fillcolor="#f59e0b", shape="cylinder"]; }

add_cluster(flow, name, opts \\ [])

@spec add_cluster(t(), String.t(), keyword()) :: t()

Defines a cluster (subgraph) for grouping nodes visually.

Options

  • :parent — name of the parent cluster for nesting
  • :label — display label (defaults to the cluster name)
  • :style:filled, :rounded, etc.
  • :fillcolor — background colour
  • :color — border colour

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_cluster(flow, "ingest", label: "Ingestion")
iex> flow.clusters["cluster_ingest"].label
"Ingestion"

add_conditional(flow, id, opts \\ [])

@spec add_conditional(t(), Yog.node_id(), keyword()) :: t()

Adds a conditional / split node (branching).

Options

  • :label — display label (defaults to the node id)
  • :description — tooltip text
  • :cluster — cluster name for grouping

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_conditional(flow, :valid, label: "If valid")
iex> Choreo.Dataflow.nodes(flow)
[:valid]
iex> Yog.node(flow.graph, :valid).node_type
:conditional

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; valid [label="If valid", fillcolor="#8b5cf6", shape="diamond"]; }

add_dead_letter_path(flow, from, to, opts \\ [])

@spec add_dead_letter_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()

Adds a dead-letter-path edge between two stages.

Dead-letter paths are rendered in grey with a dashed style.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
...>   |> Choreo.Dataflow.add_dead_letter_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:dead_letter

add_error_path(flow, from, to, opts \\ [])

@spec add_error_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()

Adds an error-path edge between two stages.

Error paths are rendered in red with a dashed style.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
...>   |> Choreo.Dataflow.add_error_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:error

add_merge(flow, id, opts \\ [])

@spec add_merge(t(), Yog.node_id(), keyword()) :: t()

Adds a merge / join node (combining multiple streams).

Options

  • :label — display label (defaults to the node id)
  • :description — tooltip text
  • :cluster — cluster name for grouping

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_merge(flow, :join, label: "Join")
iex> Choreo.Dataflow.nodes(flow)
[:join]
iex> Yog.node(flow.graph, :join).node_type
:merge

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; join [label="Join", fillcolor="#06b6d4", shape="trapezium"]; }

add_retry_path(flow, from, to, opts \\ [])

@spec add_retry_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()

Adds a retry-path edge between two stages.

Retry paths are rendered in orange with a dotted style.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_transform(:a)
...>   |> Choreo.Dataflow.add_sink(:b)
...>   |> Choreo.Dataflow.add_retry_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:retry

add_sink(flow, id, opts \\ [])

@spec add_sink(t(), Yog.node_id(), keyword()) :: t()

Adds a sink node (data consumer / terminal).

Options

  • :label — display label (defaults to the node id)
  • :description — tooltip text
  • :cluster — cluster name for grouping

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_sink(flow, :db, label: "TimescaleDB")
iex> Choreo.Dataflow.nodes(flow)
[:db]
iex> Yog.node(flow.graph, :db).node_type
:sink

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; db [label="TimescaleDB", fillcolor="#f43f5e", shape="invhouse"]; }

add_source(flow, id, opts \\ [])

@spec add_source(t(), Yog.node_id(), keyword()) :: t()

Adds a source node (data producer / entry point).

Options

  • :label — display label (defaults to the node id)
  • :description — tooltip text
  • :rate — throughput in events/sec (used by simulation)
  • :cluster — cluster name for grouping

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_source(flow, :sensor, label: "IoT Sensor")
iex> Choreo.Dataflow.nodes(flow)
[:sensor]
iex> Yog.node(flow.graph, :sensor).node_type
:source
iex> Yog.node(flow.graph, :sensor).label
"IoT Sensor"

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; sensor [label="IoT Sensor", fillcolor="#10b981", shape="house"]; }

add_transform(flow, id, opts \\ [])

@spec add_transform(t(), Yog.node_id(), keyword()) :: t()

Adds a transform node (stateless processing).

Options

  • :label — display label (defaults to the node id)
  • :description — tooltip text
  • :latency_ms — processing latency in milliseconds (used by simulation)
  • :cluster — cluster name for grouping

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_transform(flow, :parse, label: "JSON Parser")
iex> Choreo.Dataflow.nodes(flow)
[:parse]
iex> Yog.node(flow.graph, :parse).node_type
:transform

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; parse [label="JSON Parser", fillcolor="#3b82f6", shape="box3d"]; }

connect(flow, from, to, opts \\ [])

@spec connect(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()

Connects two stages with a data-flow edge.

Options

  • :data_type — type of data travelling the edge (rendered as label)
  • :label — override label (defaults to data_type)
  • :rate — throughput annotation (visual only)
  • :path_type:normal (default), :error, :retry, :dead_letter
  • :weight — numeric weight for critical-path analysis (default: 1)

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.connect(:a, :b, data_type: "event")
iex> Choreo.Dataflow.edges(flow)
[{:a, :b, 1}]
iex> flow.edge_meta[{:a, :b}].label
"event"
iex> flow.edge_meta[{:a, :b}].path_type
:normal

Diagram

digraph G { graph [rankdir=LR, splines=spline, nodesep=0.6, ranksep=1.2]; node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"]; edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0]; b [label="b", fillcolor="#3b82f6", shape="box3d"]; a [label="a", fillcolor="#10b981", shape="house"]; a -> b [style="solid", penwidth="1.0", color="#64748b", label="event"]; }

edges(dataflow)

@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]

Returns all edges as {from, to, weight} tuples.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
...>   |> Choreo.Dataflow.connect(:a, :b)
iex> Choreo.Dataflow.edges(flow)
[{:a, :b, 1}]

new()

@spec new() :: t()

Creates a new empty dataflow graph.

Dataflow graphs are always directed.

Examples

iex> flow = Choreo.Dataflow.new()
iex> Choreo.Dataflow.nodes(flow)
[]
iex> Choreo.Dataflow.edges(flow)
[]

nodes(dataflow)

@spec nodes(t()) :: [Yog.node_id()]

Returns all node IDs in the dataflow.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_transform(:b)
iex> Enum.sort(Choreo.Dataflow.nodes(flow))
[:a, :b]

nodes_of_type(dataflow, type)

@spec nodes_of_type(t(), atom()) :: [Yog.node_id()]

Returns all nodes of a given type.

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:a)
...>   |> Choreo.Dataflow.add_source(:b)
...>   |> Choreo.Dataflow.add_sink(:c)
iex> Enum.sort(Choreo.Dataflow.nodes_of_type(flow, :source))
[:a, :b]
iex> Choreo.Dataflow.nodes_of_type(flow, :sink)
[:c]
iex> Choreo.Dataflow.nodes_of_type(flow, :transform)
[]

to_dot(flow, opts \\ [])

@spec to_dot(
  t(),
  keyword()
) :: String.t()

Renders the dataflow to DOT format.

Options

Examples

iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...>   |> Choreo.Dataflow.add_source(:in, label: "Input")
...>   |> Choreo.Dataflow.add_transform(:proc, label: "Process")
...>   |> Choreo.Dataflow.add_sink(:out, label: "Output")
...>   |> Choreo.Dataflow.connect(:in, :proc, data_type: "raw")
...>   |> Choreo.Dataflow.connect(:proc, :out, data_type: "result")
iex> dot = Choreo.Dataflow.to_dot(flow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "Input")
true
iex> String.contains?(dot, "Output")
true

to_graph(dataflow)

@spec to_graph(t()) :: Yog.graph()

Returns the raw Yog.Graph struct underpinning the dataflow.

Examples

iex> flow = Choreo.Dataflow.new()
iex> graph = Choreo.Dataflow.to_graph(flow)
iex> graph.kind
:directed