ExArrow.Stream (ex_arrow v0.7.0)

View Source

Opaque handle to a native Arrow record-batch stream.

Provides a unified iterator interface over four backing sources:

BackendCreated by
:ipcExArrow.IPC.Reader — Arrow IPC stream or file format
:parquetExArrow.Parquet.Reader — lazy row-group Parquet reader
:adbcExArrow.ADBC.Statement.execute/1 — SQL result streams
:flight_sqlExArrow.FlightSQL.Client.stream_query/2 — Flight SQL streams

Plain Flight do_get results also use the :ipc backend (the Flight client returns an IPC stream resource).

All backends expose the same three functions:

  • schema/1 — inspect the Arrow schema without consuming any batches
  • next/1 — read the next batch on demand (nil when exhausted)
  • to_list/1 — collect all remaining batches into a list

Record batch data stays in native Arrow memory until consumed. Callers never set the backend field directly; it is assigned by the function that opens the stream.

Enumerable

ExArrow.Stream implements the Enumerable protocol, so all Enum and Stream functions work directly on a stream handle:

{:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM t")

# Collect all batches into a list
batches = Enum.to_list(stream)

# Map over batches lazily (materialises here via Enum.map)
Enum.map(stream, fn batch -> ExArrow.RecordBatch.num_rows(batch) end)

# Take the first N batches then stop — the rest are not fetched
first_two = Enum.take(stream, 2)

Each element yielded by the enumerator is an ExArrow.RecordBatch.t(). The batch count is not known up front, so Enum.count/1 traverses the entire stream. Prefer the :num_rows field on ExArrow.FlightSQL.Result when the result has already been materialised.

Enumeration raises on a transport or server error. For recoverable error handling iterate manually with next/1.

Resource lifecycle

The underlying gRPC channel and batch buffer are held in a native resource. The resource is released when the stream handle is garbage-collected. Stopping enumeration early (e.g. Enum.take/2) is safe — the resource will be released when the stream variable goes out of scope.

Summary

Functions

Execute an ADBC statement and return its result as a stream of record batches.

Retrieve data for ticket from a Flight server as a stream of record batches.

Execute a SQL query against a Flight SQL server and return a lazy stream of record batches.

Open an Arrow IPC stream from an in-memory binary.

Open an Arrow IPC stream from a file at path.

Open a Parquet file at path for lazy row-group streaming.

Open a Parquet stream from an in-memory binary.

Returns the next record batch from the stream, or nil when done. Returns {:error, message} on read error.

Returns the schema of this stream (without consuming it). Returns {:error, message} if the stream is invalid (e.g. poisoned lock).

Returns the origin metadata attached to this stream by the from_*/ constructors. The value is backend-specific (e.g. {:parquet, path} or {:flight_sql, sql}) and is forwarded to telemetry events as :source.

Collects all remaining batches from the stream into a list.

Types

t()

@opaque t()

Functions

from_adbc(statement)

@spec from_adbc(ExArrow.ADBC.Statement.t()) :: {:ok, t()} | {:error, term()}

Execute an ADBC statement and return its result as a stream of record batches.

Accepts either a prepared ExArrow.ADBC.Statement.t() (built with ExArrow.ADBC.Statement.new/2 or new/3) or a {connection, sql} pair, in which case a one-shot statement is created, executed, and discarded.

The stream is tagged with source: {:adbc, sql} (or :statement when a pre-built statement is passed).

Examples

{:ok, stream} = ExArrow.Stream.from_adbc(stmt)

{:ok, stream} = ExArrow.Stream.from_adbc(conn, "SELECT * FROM events")

from_adbc(connection, sql)

@spec from_adbc(ExArrow.ADBC.Connection.t(), String.t()) ::
  {:ok, t()} | {:error, term()}

from_flight(client, ticket)

@spec from_flight(ExArrow.Flight.Client.t(), term()) :: {:ok, t()} | {:error, term()}

Retrieve data for ticket from a Flight server as a stream of record batches.

Delegates to ExArrow.Flight.Client.do_get/2 and emits a [:ex_arrow, :flight, :query] telemetry event on success. The stream is tagged with source: {:flight, ticket}.

Example

{:ok, stream} = ExArrow.Stream.from_flight(client, "sales_2024")

from_flight_sql(client, sql)

@spec from_flight_sql(ExArrow.FlightSQL.Client.t(), String.t()) ::
  {:ok, t()} | {:error, term()}

Execute a SQL query against a Flight SQL server and return a lazy stream of record batches.

Delegates to ExArrow.FlightSQL.Client.stream_query/2, emits a [:ex_arrow, :flight_sql, :query] telemetry event on success, and tags the stream with source: {:flight_sql, sql}.

Example

{:ok, stream} = ExArrow.Stream.from_flight_sql(client, "SELECT * FROM events")

from_ipc(binary)

@spec from_ipc(binary()) :: {:ok, t()} | {:error, String.t()}

Open an Arrow IPC stream from an in-memory binary.

Delegates to ExArrow.IPC.Reader.from_binary/1 and tags the resulting stream with source: {:ipc, :binary} for telemetry. Returns {:ok, stream} or {:error, message}.

Example

{:ok, stream} = ExArrow.Stream.from_ipc(ipc_bytes)

from_ipc_file(path)

@spec from_ipc_file(Path.t()) :: {:ok, t()} | {:error, String.t()}

Open an Arrow IPC stream from a file at path.

Delegates to ExArrow.IPC.Reader.from_file/1 and tags the stream with source: {:ipc, path}. Returns {:ok, stream} or {:error, message}.

from_parquet(path)

@spec from_parquet(Path.t()) :: {:ok, t()} | {:error, String.t()}

Open a Parquet file at path for lazy row-group streaming.

Delegates to ExArrow.Parquet.Reader.from_file/1, tags the stream with source: {:parquet, path}, and emits a [:ex_arrow, :parquet, :read] telemetry event on successful open. Returns {:ok, stream} or {:error, message}.

Example

{:ok, stream} = ExArrow.Stream.from_parquet("/data/events.parquet")

from_parquet_binary(binary)

@spec from_parquet_binary(binary()) :: {:ok, t()} | {:error, String.t()}

Open a Parquet stream from an in-memory binary.

Delegates to ExArrow.Parquet.Reader.from_binary/1 and emits a [:ex_arrow, :parquet, :read] telemetry event on successful open with source: :binary.

next(stream)

@spec next(t()) ::
  ExArrow.RecordBatch.t()
  | nil
  | {:error, String.t()}
  | {:error, {atom(), non_neg_integer(), String.t()}}

Returns the next record batch from the stream, or nil when done. Returns {:error, message} on read error.

For :flight_sql streams, read errors carry a structured 3-tuple so callers can distinguish gRPC codes:

{:error, {code_atom, grpc_status_integer, message}} |
{:error, string_message}

Enum.* / Stream.* functions raise on any error shape. For recoverable error handling iterate with next/1 directly.

schema(stream)

@spec schema(t()) :: {:ok, ExArrow.Schema.t()} | {:error, String.t()}

Returns the schema of this stream (without consuming it). Returns {:error, message} if the stream is invalid (e.g. poisoned lock).

source(stream)

@spec source(t()) :: term()

Returns the origin metadata attached to this stream by the from_*/ constructors. The value is backend-specific (e.g. {:parquet, path} or {:flight_sql, sql}) and is forwarded to telemetry events as :source.

to_list(stream)

@spec to_list(t()) :: [ExArrow.RecordBatch.t()]

Collects all remaining batches from the stream into a list.

Stops at the first error and raises. Returns an empty list for an already-exhausted stream.