ExArrow.Stream (ex_arrow v0.6.0)

View Source

Opaque handle to a native Arrow record-batch stream.

Provides a unified iterator interface over four backing sources:

BackendCreated by
:ipcExArrow.IPC.Reader — Arrow IPC stream or file format
:parquetExArrow.Parquet.Reader — lazy row-group Parquet reader
:adbcExArrow.ADBC.Statement.execute/1 — SQL result streams
:flight_sqlExArrow.FlightSQL.Client.stream_query/2 — Flight SQL streams

Plain Flight do_get results also use the :ipc backend (the Flight client returns an IPC stream resource).

All backends expose the same three functions:

  • schema/1 — inspect the Arrow schema without consuming any batches
  • next/1 — read the next batch on demand (nil when exhausted)
  • to_list/1 — collect all remaining batches into a list

Record batch data stays in native Arrow memory until consumed. Callers never set the backend field directly; it is assigned by the function that opens the stream.

Enumerable

ExArrow.Stream implements the Enumerable protocol, so all Enum and Stream functions work directly on a stream handle:

{:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM t")

# Collect all batches into a list
batches = Enum.to_list(stream)

# Map over batches lazily (materialises here via Enum.map)
Enum.map(stream, fn batch -> ExArrow.RecordBatch.num_rows(batch) end)

# Take the first N batches then stop — the rest are not fetched
first_two = Enum.take(stream, 2)

Each element yielded by the enumerator is an ExArrow.RecordBatch.t(). The batch count is not known up front, so Enum.count/1 traverses the entire stream. Prefer the :num_rows field on ExArrow.FlightSQL.Result when the result has already been materialised.

Enumeration raises on a transport or server error. For recoverable error handling iterate manually with next/1.

Resource lifecycle

The underlying gRPC channel and batch buffer are held in a native resource. The resource is released when the stream handle is garbage-collected. Stopping enumeration early (e.g. Enum.take/2) is safe — the resource will be released when the stream variable goes out of scope.

Summary

Functions

Returns the next record batch from the stream, or nil when done. Returns {:error, message} on read error.

Returns the schema of this stream (without consuming it). Returns {:error, message} if the stream is invalid (e.g. poisoned lock).

Collects all remaining batches from the stream into a list.

Types

t()

@opaque t()

Functions

next(stream)

@spec next(t()) ::
  ExArrow.RecordBatch.t()
  | nil
  | {:error, String.t()}
  | {:error, {atom(), non_neg_integer(), String.t()}}

Returns the next record batch from the stream, or nil when done. Returns {:error, message} on read error.

For :flight_sql streams, read errors carry a structured 3-tuple so callers can distinguish gRPC codes:

{:error, {code_atom, grpc_status_integer, message}} |
{:error, string_message}

Enum.* / Stream.* functions raise on any error shape. For recoverable error handling iterate with next/1 directly.

schema(stream)

@spec schema(t()) :: {:ok, ExArrow.Schema.t()} | {:error, String.t()}

Returns the schema of this stream (without consuming it). Returns {:error, message} if the stream is invalid (e.g. poisoned lock).

to_list(stream)

@spec to_list(t()) :: [ExArrow.RecordBatch.t()]

Collects all remaining batches from the stream into a list.

Stops at the first error and raises. Returns an empty list for an already-exhausted stream.