ExArrow.Pipeline (ex_arrow v0.7.1)

View Source

A thin pipeline abstraction over ExArrow streams.

ExArrow.Pipeline provides a stable, composable API for transforming and sinking Arrow streams without exposing the underlying execution mechanism. Internally it threads an Elixir Stream of ExArrow.RecordBatch values alongside the stream's schema, so transformations stay lazy and batches are never converted to row maps.

Every function accepts and returns {:ok, pipeline} | {:error, reason}, so pipelines compose with |>/2 directly from ExArrow.Stream.from_*/1 constructors (which return {:ok, stream} | {:error, _}). Errors short- circuit through every stage.

Example

ExArrow.Stream.from_flight_sql(client, "SELECT * FROM events")
|> ExArrow.Pipeline.map_batches(fn batch ->
  {:ok, slim} = ExArrow.Batch.select(batch, ["id", "score"])
  slim
end)
|> ExArrow.Pipeline.write_parquet("/data/events.parquet")

Laziness

map_batches/2 and each_batch/2 return a lazy pipeline — no batches are consumed until a sink (write_parquet/2, write_flight/3, or write_dataframe/1) runs. This means a pipeline built but never sunk does no work.

Telemetry

map_batches/2 and each_batch/2 emit [:ex_arrow, :pipeline, :batch] for every batch that flows through the stage, carrying rows, columns, and batch_count measurements and %{source: :pipeline} metadata. The sinks emit their respective [:ex_arrow, :parquet, :write] and [:ex_arrow, :flight, :query] events.

Summary

Functions

Run fun for its side effects on each batch without changing the pipeline.

Wrap an ExArrow.Stream.t() (or thread an existing pipeline) and apply fun to each batch.

Consume the pipeline and convert it into an Explorer.DataFrame.

Consume the pipeline and upload every batch to a Flight server via client.

Consume the pipeline and write every batch to a Parquet file at path.

Types

input()

@type input() :: ExArrow.Stream.t() | t()

t()

@type t() :: %ExArrow.Pipeline{enum: Enumerable.t(), schema: ExArrow.Schema.t() | nil}

threaded()

@type threaded() :: {:ok, input()} | {:error, term()}

Functions

each_batch(threaded, fun)

@spec each_batch(threaded(), (ExArrow.RecordBatch.t() -> term())) :: threaded()

Run fun for its side effects on each batch without changing the pipeline.

Lazy: the side effect runs only when a sink consumes the pipeline. The pipeline's batches pass through unchanged.

map_batches(threaded, fun)

@spec map_batches(threaded(), (ExArrow.RecordBatch.t() -> term())) :: threaded()

Wrap an ExArrow.Stream.t() (or thread an existing pipeline) and apply fun to each batch.

The resulting pipeline is lazy: fun runs only when a sink consumes the pipeline. fun receives an ExArrow.RecordBatch.t() and should return an ExArrow.RecordBatch.t() (or any term if the downstream sink expects it).

Example

ExArrow.Stream.from_parquet("/data/events.parquet")
|> ExArrow.Pipeline.map_batches(&ExArrow.Batch.schema/1)

write_dataframe(threaded)

@spec write_dataframe(threaded()) ::
  {:ok, Explorer.DataFrame.t()} | {:error, String.t()}

Consume the pipeline and convert it into an Explorer.DataFrame.

Requires the optional {:explorer, "~> 0.11"} dependency. Returns {:ok, dataframe} or {:error, message}.

Memory

This sink materialises all batches into a BEAM list before conversion. See write_parquet/2 for details and alternatives.

write_flight(threaded, client, opts \\ [])

@spec write_flight(threaded(), ExArrow.Flight.Client.t(), keyword()) ::
  :ok | {:error, term()}

Consume the pipeline and upload every batch to a Flight server via client.

opts are forwarded to ExArrow.Flight.Client.do_put/4. Emits a [:ex_arrow, :flight, :query] telemetry event. Returns :ok or {:error, reason}.

Memory

This sink materialises all batches into a BEAM list before uploading. See write_parquet/2 for details and alternatives.

write_parquet(threaded, path)

@spec write_parquet(threaded(), Path.t()) :: :ok | {:error, String.t()}

Consume the pipeline and write every batch to a Parquet file at path.

Triggers evaluation of all upstream stages. Emits a [:ex_arrow, :parquet, :write] telemetry event. Returns :ok or {:error, message}.

Memory

This sink materialises all batches into a BEAM list before writing. For very large streams this can be a memory concern. The underlying ExArrow.Parquet.Writer.to_file/3 currently requires a list; a streaming write path may be added in a future release. For datasets that do not fit in memory, iterate with ExArrow.Stream.next/1 and write batches incrementally.