Dataflow / pipeline diagram builder on top of Yog.
Choreo.Dataflow models stream-processing and ETL pipelines where
nodes are processing stages and edges are data streams. It is a natural
complement to Choreo architecture diagrams: those show what
infrastructure exists; dataflow shows how data moves through it.
When to use
Use Choreo.Dataflow when designing, reviewing, or debugging pipelines
— event-driven systems, ETL jobs, stream processors, or microservice
data flows. It helps identify bottlenecks, orphan stages, and critical
paths before they hit production.
Node types
:source— entry point that produces data:sink— terminal consumer that persists or emits data:transform— stateless operation (map, filter, aggregate):buffer— queue, topic, or back-pressure buffer:conditional— branch / split based on a predicate:merge— join multiple streams into one
Further reading
Quick Start
pipeline =
Choreo.Dataflow.new()
|> Choreo.Dataflow.add_source(:sensor, label: "IoT Sensor")
|> Choreo.Dataflow.add_transform(:parse, label: "JSON Parser")
|> Choreo.Dataflow.add_buffer(:kafka, label: "Kafka Topic")
|> Choreo.Dataflow.add_transform(:aggregate, label: "Window Agg")
|> Choreo.Dataflow.add_sink(:db, label: "TimescaleDB")
|> Choreo.Dataflow.connect(:sensor, :parse, data_type: "raw bytes")
|> Choreo.Dataflow.connect(:parse, :kafka, data_type: "event")
|> Choreo.Dataflow.connect(:kafka, :aggregate, data_type: "event")
|> Choreo.Dataflow.connect(:aggregate, :db, data_type: "metrics")
dot = Choreo.Dataflow.to_dot(pipeline)Diagram
Analysis
# Detect feedback loops (usually a bug in dataflow)
Choreo.Dataflow.Analysis.cyclic?(pipeline)
# Execution order for a DAG
{:ok, order} = Choreo.Dataflow.Analysis.topological_sort(pipeline)
# Find orphan stages with no upstream source
Choreo.Dataflow.Analysis.orphan_nodes(pipeline)
# Find dead-end stages that never reach a sink
Choreo.Dataflow.Analysis.dead_ends(pipeline)
# Longest latency path (critical path)
{:ok, path, latency} = Choreo.Dataflow.Analysis.longest_path(pipeline)
# Throughput simulation
Choreo.Dataflow.Analysis.simulate(pipeline)
Summary
Functions
Adds a buffer node (queue, topic, back-pressure reservoir).
Defines a cluster (subgraph) for grouping nodes visually.
Adds a conditional / split node (branching).
Adds a dead-letter-path edge between two stages.
Adds an error-path edge between two stages.
Adds a merge / join node (combining multiple streams).
Adds a retry-path edge between two stages.
Adds a sink node (data consumer / terminal).
Adds a source node (data producer / entry point).
Adds a transform node (stateless processing).
Connects two stages with a data-flow edge.
Returns all edges as {from, to, weight} tuples.
Creates a new empty dataflow graph.
Returns all node IDs in the dataflow.
Returns all nodes of a given type.
Returns a theme for Choreo.Dataflow.
Renders the dataflow to DOT format.
Returns the raw Yog.Graph struct underpinning the dataflow.
Types
@type t() :: %Choreo.Dataflow{ clusters: %{required(String.t()) => map()}, edge_meta: %{optional({Yog.node_id(), Yog.node_id()}) => map()}, graph: Yog.graph() }
Functions
Adds a buffer node (queue, topic, back-pressure reservoir).
Options
:capacity:latency_ms:label(String.t/0):description(String.t/0):cluster(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_buffer(flow, :kafka, label: "Kafka Topic", capacity: 1000)
iex> Choreo.Dataflow.nodes(flow)
[:kafka]
iex> Yog.node(flow.graph, :kafka).node_type
:buffer
iex> Yog.node(flow.graph, :kafka).capacity
1000Diagram
Defines a cluster (subgraph) for grouping nodes visually.
Options
:parent(String.t/0):label(String.t/0):style(String.t/0):fillcolor(String.t/0):color(String.t/0)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_cluster(flow, "ingest", label: "Ingestion")
iex> flow.clusters["cluster_ingest"].label
"Ingestion"
Adds a conditional / split node (branching).
Options
:label(String.t/0):description(String.t/0):cluster(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_conditional(flow, :valid, label: "If valid")
iex> Choreo.Dataflow.nodes(flow)
[:valid]
iex> Yog.node(flow.graph, :valid).node_type
:conditionalDiagram
@spec add_dead_letter_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Adds a dead-letter-path edge between two stages.
Dead-letter paths are rendered in grey with a dashed style.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
...> |> Choreo.Dataflow.add_dead_letter_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:dead_letter
@spec add_error_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Adds an error-path edge between two stages.
Error paths are rendered in red with a dashed style.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
...> |> Choreo.Dataflow.add_error_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:error
Adds a merge / join node (combining multiple streams).
Options
:label(String.t/0):description(String.t/0):cluster(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_merge(flow, :join, label: "Join")
iex> Choreo.Dataflow.nodes(flow)
[:join]
iex> Yog.node(flow.graph, :join).node_type
:mergeDiagram
@spec add_retry_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Adds a retry-path edge between two stages.
Retry paths are rendered in orange with a dotted style.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
...> |> Choreo.Dataflow.add_retry_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:retry
Adds a sink node (data consumer / terminal).
Options
:label(String.t/0):description(String.t/0):cluster(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_sink(flow, :db, label: "TimescaleDB")
iex> Choreo.Dataflow.nodes(flow)
[:db]
iex> Yog.node(flow.graph, :db).node_type
:sinkDiagram
Adds a source node (data producer / entry point).
Options
:rate:label(String.t/0):description(String.t/0):cluster(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_source(flow, :sensor, label: "IoT Sensor")
iex> Choreo.Dataflow.nodes(flow)
[:sensor]
iex> Yog.node(flow.graph, :sensor).node_type
:source
iex> Yog.node(flow.graph, :sensor).label
"IoT Sensor"Diagram
Adds a transform node (stateless processing).
Options
:latency_ms:capacity:label(String.t/0):description(String.t/0):cluster(String.t/0):shape(atom/0):fillcolor(String.t/0):fontcolor(String.t/0):style(String.t/0):penwidth:image(String.t/0)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_transform(flow, :parse, label: "JSON Parser")
iex> Choreo.Dataflow.nodes(flow)
[:parse]
iex> Yog.node(flow.graph, :parse).node_type
:transformDiagram
Connects two stages with a data-flow edge.
Limitation
At most one edge is allowed per
(from, to)pair. Adding a second connection between the same stages raisesArgumentError. Multigraph support (parallel edges) is planned for a future release.
Options
:data_type(String.t/0):label(String.t/0):rate:path_type:weight
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.connect(:a, :b, data_type: "event")
iex> Choreo.Dataflow.edges(flow)
[{:a, :b, 1}]
iex> flow.edge_meta[{:a, :b}].label
"event"
iex> flow.edge_meta[{:a, :b}].path_type
:normalDiagram
@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]
Returns all edges as {from, to, weight} tuples.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.connect(:a, :b)
iex> Choreo.Dataflow.edges(flow)
[{:a, :b, 1}]
@spec new() :: t()
Creates a new empty dataflow graph.
Dataflow graphs are always directed.
Examples
iex> flow = Choreo.Dataflow.new()
iex> Choreo.Dataflow.nodes(flow)
[]
iex> Choreo.Dataflow.edges(flow)
[]
@spec nodes(t()) :: [Yog.node_id()]
Returns all node IDs in the dataflow.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
iex> Enum.sort(Choreo.Dataflow.nodes(flow))
[:a, :b]
@spec nodes_of_type(t(), atom()) :: [Yog.node_id()]
Returns all nodes of a given type.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_source(:b)
...> |> Choreo.Dataflow.add_sink(:c)
iex> Enum.sort(Choreo.Dataflow.nodes_of_type(flow, :source))
[:a, :b]
iex> Choreo.Dataflow.nodes_of_type(flow, :sink)
[:c]
iex> Choreo.Dataflow.nodes_of_type(flow, :transform)
[]
@spec theme( atom(), keyword() ) :: Choreo.Theme.t()
Returns a theme for Choreo.Dataflow.
Examples
iex> theme = Choreo.Dataflow.theme(:default, graph_rankdir: :td)
iex> theme.graph_rankdir
:td
Renders the dataflow to DOT format.
Options
:theme—:default,:dark, or aChoreo.Themestruct
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:in, label: "Input")
...> |> Choreo.Dataflow.add_transform(:proc, label: "Process")
...> |> Choreo.Dataflow.add_sink(:out, label: "Output")
...> |> Choreo.Dataflow.connect(:in, :proc, data_type: "raw")
...> |> Choreo.Dataflow.connect(:proc, :out, data_type: "result")
iex> dot = Choreo.Dataflow.to_dot(flow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "Input")
true
iex> String.contains?(dot, "Output")
true
Returns the raw Yog.Graph struct underpinning the dataflow.
Examples
iex> flow = Choreo.Dataflow.new()
iex> graph = Choreo.Dataflow.to_graph(flow)
iex> graph.kind
:directed