Arrow Streams

View Source

ExArrow v0.7.0 introduces first-class streaming as the primary mechanism for working with large datasets. This guide explains the streaming abstraction, the available sources, and the tradeoffs of each.

The unit of streaming: RecordBatch

ExArrow streams yield ExArrow.RecordBatch values, not row maps. A RecordBatch is an opaque handle to a native Arrow batch — a collection of column arrays sharing a schema and row count. The column buffers stay in native (Rust) memory until you explicitly extract them; only the small handle crosses the BEAM heap.

This is the central architectural principle of v0.7.0:

Operate on Arrow RecordBatch values. Not list(map()). Not Explorer.DataFrame. Not Nx.Tensor.

Explorer and Nx remain downstream consumers. ExArrow is the Arrow layer.

The ExArrow.Stream constructors

Every common source has a from_*/1 constructor on ExArrow.Stream that returns {:ok, stream} | {:error, reason}:

ConstructorSource
ExArrow.Stream.from_parquet/1Parquet file at path
ExArrow.Stream.from_parquet_binary/1In-memory Parquet bytes
ExArrow.Stream.from_ipc/1Arrow IPC stream binary
ExArrow.Stream.from_ipc_file/1Arrow IPC file at path
ExArrow.Stream.from_flight/2Flight do_get ticket
ExArrow.Stream.from_flight_sql/2Flight SQL query
ExArrow.Stream.from_adbc/1Pre-built ADBC statement
ExArrow.Stream.from_adbc/2{connection, sql} one-shot ADBC query

Each constructor tags the stream with a source term (e.g. {:parquet, path} or {:flight_sql, sql}) that is forwarded to telemetry events as :source metadata.

Consuming a stream

Three consumption patterns cover every use case.

1. Lazy iteration with next/1

ExArrow.Stream.next/1 pulls one batch at a time and returns nil when the stream is exhausted. This is the lowest-level interface and the one that gives you explicit control over error handling.

{:ok, stream} = ExArrow.Stream.from_parquet("/data/events.parquet")

loop = fn s, f ->
  case ExArrow.Stream.next(s) do
    nil -> :done
    {:error, reason} -> {:error, reason}
    batch -> f.(batch); loop.(s, f)
  end
end

loop.(stream, fn batch ->
  IO.puts("rows: #{ExArrow.RecordBatch.num_rows(batch)}")
end)

For recoverable error handling, pattern-match on the {:error, reason} return of next/1 rather than letting Enum raise.

2. Enumerable consumption

ExArrow.Stream implements Enumerable, so all Enum and Stream functions work directly on a stream handle:

{:ok, stream} = ExArrow.Stream.from_flight_sql(client, "SELECT * FROM t")

batches = Enum.to_list(stream)
row_counts = Enum.map(stream, &ExArrow.RecordBatch.num_rows/1)
first_two = Enum.take(stream, 2)

Enum.take/2 stops consuming early — the remaining batches are never fetched. Enumeration raises on a transport or server error; for recoverable handling use next/1.

3. Pipeline consumption

ExArrow.Pipeline (see the Pipeline Patterns guide) wraps a stream with composable map_batches/2 and sink functions:

ExArrow.Stream.from_parquet("/data/events.parquet")
|> ExArrow.Pipeline.map_batches(fn batch ->
  {:ok, slim} = ExArrow.Batch.select(batch, ["id", "score"])
  slim
end)
|> ExArrow.Pipeline.write_parquet("/data/slim.parquet")

The pipeline is lazy: nothing runs until a sink consumes it.

Schema preservation

Every stream carries its Arrow schema. ExArrow.Stream.schema/1 returns the ExArrow.Schema handle without consuming any batches:

{:ok, stream} = ExArrow.Stream.from_parquet("/data/events.parquet")
{:ok, schema} = ExArrow.Stream.schema(stream)
ExArrow.Schema.field_names(schema)  # => ["id", "name", "score", ...]

Field names, types, and nullability are preserved end-to-end through IPC, Parquet, Flight, and ADBC.

Backpressure and laziness

ExArrow streams are lazy by construction:

  • Parquet — the footer is scanned once on open (making the schema available); row groups are decoded on demand by next/1. Stopping early leaves the remaining row groups undecoded.
  • IPC — batches are decoded one at a time as next/1 is called.
  • Flight / Flight SQL — the gRPC stream stays open until exhausted or garbage-collected; each next/1 pulls one server-side batch.
  • ADBC — the driver's result iterator is consumed on demand.

For GenStage-style demand-driven backpressure, use the ExArrow.GenStage.*Producer modules (see the GenStage guide).

Telemetry

Every from_*/1 constructor for Parquet, Flight, and Flight SQL emits a source-level telemetry event ([:ex_arrow, :parquet, :read], [:ex_arrow, :flight, :query], [:ex_arrow, :flight_sql, :query]). Each next/1 that yields a batch emits [:ex_arrow, :stream, :batch] with rows, columns, and batch_count measurements.

See the Telemetry module docs for the full event list and a handler example.

Choosing a source

NeedUse
Read a columnar file from diskfrom_parquet/1 or from_ipc_file/1
Read bytes already in memoryfrom_parquet_binary/1 or from_ipc/1
Query a Flight / Flight SQL serverfrom_flight/2 or from_flight_sql/2
Query via an ADBC driverfrom_adbc/1 (prepared) or from_adbc/2 (SQL)
Compose with Flow / GenStage / BroadwayExArrow.Flow, ExArrow.GenStage, ExArrow.Broadway

Example: copy a Parquet file through Arrow

{:ok, stream} = ExArrow.Stream.from_parquet("/data/events.parquet")
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = ExArrow.Stream.to_list(stream)
:ok = ExArrow.Parquet.Writer.to_file("/data/events_copy.parquet", schema, batches)

Or, using the Pipeline DSL:

ExArrow.Stream.from_parquet("/data/events.parquet")
|> ExArrow.Pipeline.write_parquet("/data/events_copy.parquet")