Dataflow / pipeline diagram builder on top of Yog.
Choreo.Dataflow models stream-processing and ETL pipelines where
nodes are processing stages and edges are data streams. It is a natural
complement to Choreo architecture diagrams: those show what
infrastructure exists; dataflow shows how data moves through it.
When to use
Use Choreo.Dataflow when designing, reviewing, or debugging pipelines
— event-driven systems, ETL jobs, stream processors, or microservice
data flows. It helps identify bottlenecks, orphan stages, and critical
paths before they hit production.
Node types
:source— entry point that produces data:sink— terminal consumer that persists or emits data:transform— stateless operation (map, filter, aggregate):buffer— queue, topic, or back-pressure buffer:conditional— branch / split based on a predicate:merge— join multiple streams into one
Further reading
Quick Start
pipeline =
Choreo.Dataflow.new()
|> Choreo.Dataflow.add_source(:sensor, label: "IoT Sensor")
|> Choreo.Dataflow.add_transform(:parse, label: "JSON Parser")
|> Choreo.Dataflow.add_buffer(:kafka, label: "Kafka Topic")
|> Choreo.Dataflow.add_transform(:aggregate, label: "Window Agg")
|> Choreo.Dataflow.add_sink(:db, label: "TimescaleDB")
|> Choreo.Dataflow.connect(:sensor, :parse, data_type: "raw bytes")
|> Choreo.Dataflow.connect(:parse, :kafka, data_type: "event")
|> Choreo.Dataflow.connect(:kafka, :aggregate, data_type: "event")
|> Choreo.Dataflow.connect(:aggregate, :db, data_type: "metrics")
dot = Choreo.Dataflow.to_dot(pipeline)Diagram
Analysis
# Detect feedback loops (usually a bug in dataflow)
Choreo.Dataflow.Analysis.cyclic?(pipeline)
# Execution order for a DAG
{:ok, order} = Choreo.Dataflow.Analysis.topological_sort(pipeline)
# Find orphan stages with no upstream source
Choreo.Dataflow.Analysis.orphan_nodes(pipeline)
# Find dead-end stages that never reach a sink
Choreo.Dataflow.Analysis.dead_ends(pipeline)
# Longest latency path (critical path)
{:ok, path, latency} = Choreo.Dataflow.Analysis.longest_path(pipeline)
# Throughput simulation
Choreo.Dataflow.Analysis.simulate(pipeline)
Summary
Functions
Adds a buffer node (queue, topic, back-pressure reservoir).
Defines a cluster (subgraph) for grouping nodes visually.
Adds a conditional / split node (branching).
Adds a dead-letter-path edge between two stages.
Adds an error-path edge between two stages.
Adds a merge / join node (combining multiple streams).
Adds a retry-path edge between two stages.
Adds a sink node (data consumer / terminal).
Adds a source node (data producer / entry point).
Adds a transform node (stateless processing).
Connects two stages with a data-flow edge.
Returns all edges as {from, to, weight} tuples.
Creates a new empty dataflow graph.
Returns all node IDs in the dataflow.
Returns all nodes of a given type.
Renders the dataflow to DOT format.
Returns the raw Yog.Graph struct underpinning the dataflow.
Types
@type t() :: %Choreo.Dataflow{ clusters: %{required(String.t()) => map()}, edge_meta: %{optional({Yog.node_id(), Yog.node_id()}) => map()}, graph: Yog.graph() }
Functions
@spec add_buffer(t(), Yog.node_id(), keyword()) :: t()
Adds a buffer node (queue, topic, back-pressure reservoir).
Options
:label— display label (defaults to the node id):description— tooltip text:capacity— annotated capacity (visual only):latency_ms— buffering latency in milliseconds (used by simulation):cluster— cluster name for grouping
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_buffer(flow, :kafka, label: "Kafka Topic", capacity: 1000)
iex> Choreo.Dataflow.nodes(flow)
[:kafka]
iex> Yog.node(flow.graph, :kafka).node_type
:buffer
iex> Yog.node(flow.graph, :kafka).capacity
1000Diagram
Defines a cluster (subgraph) for grouping nodes visually.
Options
:parent— name of the parent cluster for nesting:label— display label (defaults to the cluster name):style—:filled,:rounded, etc.:fillcolor— background colour:color— border colour
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_cluster(flow, "ingest", label: "Ingestion")
iex> flow.clusters["cluster_ingest"].label
"Ingestion"
@spec add_conditional(t(), Yog.node_id(), keyword()) :: t()
Adds a conditional / split node (branching).
Options
:label— display label (defaults to the node id):description— tooltip text:cluster— cluster name for grouping
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_conditional(flow, :valid, label: "If valid")
iex> Choreo.Dataflow.nodes(flow)
[:valid]
iex> Yog.node(flow.graph, :valid).node_type
:conditionalDiagram
@spec add_dead_letter_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Adds a dead-letter-path edge between two stages.
Dead-letter paths are rendered in grey with a dashed style.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
...> |> Choreo.Dataflow.add_dead_letter_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:dead_letter
@spec add_error_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Adds an error-path edge between two stages.
Error paths are rendered in red with a dashed style.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
...> |> Choreo.Dataflow.add_error_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:error
@spec add_merge(t(), Yog.node_id(), keyword()) :: t()
Adds a merge / join node (combining multiple streams).
Options
:label— display label (defaults to the node id):description— tooltip text:cluster— cluster name for grouping
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_merge(flow, :join, label: "Join")
iex> Choreo.Dataflow.nodes(flow)
[:join]
iex> Yog.node(flow.graph, :join).node_type
:mergeDiagram
@spec add_retry_path(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Adds a retry-path edge between two stages.
Retry paths are rendered in orange with a dotted style.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_transform(:a)
...> |> Choreo.Dataflow.add_sink(:b)
...> |> Choreo.Dataflow.add_retry_path(:a, :b)
iex> flow.edge_meta[{:a, :b}].path_type
:retry
@spec add_sink(t(), Yog.node_id(), keyword()) :: t()
Adds a sink node (data consumer / terminal).
Options
:label— display label (defaults to the node id):description— tooltip text:cluster— cluster name for grouping
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_sink(flow, :db, label: "TimescaleDB")
iex> Choreo.Dataflow.nodes(flow)
[:db]
iex> Yog.node(flow.graph, :db).node_type
:sinkDiagram
@spec add_source(t(), Yog.node_id(), keyword()) :: t()
Adds a source node (data producer / entry point).
Options
:label— display label (defaults to the node id):description— tooltip text:rate— throughput in events/sec (used by simulation):cluster— cluster name for grouping
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_source(flow, :sensor, label: "IoT Sensor")
iex> Choreo.Dataflow.nodes(flow)
[:sensor]
iex> Yog.node(flow.graph, :sensor).node_type
:source
iex> Yog.node(flow.graph, :sensor).label
"IoT Sensor"Diagram
@spec add_transform(t(), Yog.node_id(), keyword()) :: t()
Adds a transform node (stateless processing).
Options
:label— display label (defaults to the node id):description— tooltip text:latency_ms— processing latency in milliseconds (used by simulation):cluster— cluster name for grouping
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = Choreo.Dataflow.add_transform(flow, :parse, label: "JSON Parser")
iex> Choreo.Dataflow.nodes(flow)
[:parse]
iex> Yog.node(flow.graph, :parse).node_type
:transformDiagram
@spec connect(t(), Yog.node_id(), Yog.node_id(), keyword()) :: t()
Connects two stages with a data-flow edge.
Options
:data_type— type of data travelling the edge (rendered as label):label— override label (defaults todata_type):rate— throughput annotation (visual only):path_type—:normal(default),:error,:retry,:dead_letter:weight— numeric weight for critical-path analysis (default:1)
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.connect(:a, :b, data_type: "event")
iex> Choreo.Dataflow.edges(flow)
[{:a, :b, 1}]
iex> flow.edge_meta[{:a, :b}].label
"event"
iex> flow.edge_meta[{:a, :b}].path_type
:normalDiagram
@spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]
Returns all edges as {from, to, weight} tuples.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
...> |> Choreo.Dataflow.connect(:a, :b)
iex> Choreo.Dataflow.edges(flow)
[{:a, :b, 1}]
@spec new() :: t()
Creates a new empty dataflow graph.
Dataflow graphs are always directed.
Examples
iex> flow = Choreo.Dataflow.new()
iex> Choreo.Dataflow.nodes(flow)
[]
iex> Choreo.Dataflow.edges(flow)
[]
@spec nodes(t()) :: [Yog.node_id()]
Returns all node IDs in the dataflow.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_transform(:b)
iex> Enum.sort(Choreo.Dataflow.nodes(flow))
[:a, :b]
@spec nodes_of_type(t(), atom()) :: [Yog.node_id()]
Returns all nodes of a given type.
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:a)
...> |> Choreo.Dataflow.add_source(:b)
...> |> Choreo.Dataflow.add_sink(:c)
iex> Enum.sort(Choreo.Dataflow.nodes_of_type(flow, :source))
[:a, :b]
iex> Choreo.Dataflow.nodes_of_type(flow, :sink)
[:c]
iex> Choreo.Dataflow.nodes_of_type(flow, :transform)
[]
Renders the dataflow to DOT format.
Options
:theme—:default,:dark, or aChoreo.Themestruct
Examples
iex> flow = Choreo.Dataflow.new()
iex> flow = flow
...> |> Choreo.Dataflow.add_source(:in, label: "Input")
...> |> Choreo.Dataflow.add_transform(:proc, label: "Process")
...> |> Choreo.Dataflow.add_sink(:out, label: "Output")
...> |> Choreo.Dataflow.connect(:in, :proc, data_type: "raw")
...> |> Choreo.Dataflow.connect(:proc, :out, data_type: "result")
iex> dot = Choreo.Dataflow.to_dot(flow)
iex> String.contains?(dot, "digraph")
true
iex> String.contains?(dot, "Input")
true
iex> String.contains?(dot, "Output")
true
Returns the raw Yog.Graph struct underpinning the dataflow.
Examples
iex> flow = Choreo.Dataflow.new()
iex> graph = Choreo.Dataflow.to_graph(flow)
iex> graph.kind
:directed