ExArrow. Flow
(ex_arrow v0.7.1)
View Source
Arrow-native Flow execution.
Wraps Flow so ExArrow streams of ExArrow.RecordBatch values can be
processed concurrently while staying entirely in native Arrow memory. The
unit of work is a batch, never a row map.
Requires {:flow, "~> 1.2"} in your mix.exs dependencies. When Flow is
absent every function returns {:error, "Flow is not available..."}.
Quick start
{:ok, stream} = ExArrow.Stream.from_parquet("/data/events.parquet")
stream
|> ExArrow.Flow.from_batches()
|> Flow.map(&ExArrow.RecordBatch.num_rows/1)
|> Enum.to_list()How it works
from_batches/1 calls Flow.from_enumerable/2 on the ExArrow stream. The
stream's Enumerable implementation yields one ExArrow.RecordBatch per
step, so each Flow stage receives a batch handle. Because the handle is an
opaque reference to native memory, no column buffers are copied to the BEAM
heap when a batch moves between stages — only the small reference term is
sent over the mailbox.
All standard Flow combinators work on the result:
Flow.map/2— transform each batchFlow.flat_map/2— expand one batch into manyFlow.partition/2— partition batches by key for shuffled reductionsFlow.reduce/3— reduce batches within a window/partition
Performance implications
- Parallelism: Flow spins up a configurable number of producer and
consumer stages (
:stages,:max_demand,:min_demand). Each stage decodes and transforms batches independently, so wall-clock time scales with available cores for CPU-bound work. - Memory: only batch references cross process boundaries; the Arrow
buffers stay in native memory until a stage explicitly extracts them.
Peak memory is roughly
stages × largest_batchrather than the whole dataset. - Backpressure: GenStage demand is honoured end-to-end, so a slow consumer slows the producer without piling up batches.
- Not a row API: converting batches to row maps inside a Flow stage
defeats the purpose — keep transformations column-wise (e.g. via
ExArrow.BatchorExArrow.Compute).
Telemetry
map_batches/2 and each_batch/2 emit [:ex_arrow, :pipeline, :batch] for
every batch processed, with rows, columns, and batch_count
measurements and %{source: :flow} metadata. Raw Flow.map/2 does not
emit telemetry (callers can attach it themselves).
Summary
Functions
@spec each_batch(Flow.t(), (ExArrow.RecordBatch.t() -> term())) :: Flow.t()
Run fun for its side effects on each batch in flow, emitting a
[:ex_arrow, :pipeline, :batch] telemetry event per batch.
The flow's elements are unchanged (the original batches pass through).
@spec from_batches( Enumerable.t() | {:ok, Enumerable.t()} | {:error, term()}, keyword() ) :: Flow.t()
Build a Flow from a stream (or list) of record batches.
Accepts:
- an
ExArrow.Stream.t()or anyEnumerable.t()ofExArrow.RecordBatch.t()values {:ok, enumerable}— unwrapped automatically so the function composes withExArrow.Stream.from_*/1constructors in a pipe{:error, reason}— raises so pipeline failures surface immediately (use awithchain if you prefer explicit error handling)
opts are forwarded to Flow.from_enumerable/2 (:stages, :window,
:max_demand, :min_demand, :buffer_size, ...).
Returns a Flow.t(). The flow's elements are ExArrow.RecordBatch
values.
Example
{:ok, stream} = ExArrow.Stream.from_parquet("/data/events.parquet")
flow = ExArrow.Flow.from_batches(stream, stages: 4)
@spec map_batches(Flow.t(), (ExArrow.RecordBatch.t() -> term())) :: Flow.t()
Map a function over each batch in flow, emitting a
[:ex_arrow, :pipeline, :batch] telemetry event per batch.
fun receives an ExArrow.RecordBatch.t() and returns any term. The
returned flow's elements are whatever fun returns.
Example
flow
|> ExArrow.Flow.map_batches(fn batch ->
{:ok, slim} = ExArrow.Batch.select(batch, ["id"])
slim
end)
|> Enum.to_list()