ExArrow. Pipeline
(ex_arrow v0.7.0)
View Source
A thin pipeline abstraction over ExArrow streams.
ExArrow.Pipeline provides a stable, composable API for transforming and
sinking Arrow streams without exposing the underlying execution mechanism.
Internally it threads an Elixir Stream of ExArrow.RecordBatch values
alongside the stream's schema, so transformations stay lazy and batches are
never converted to row maps.
Every function accepts and returns {:ok, pipeline} | {:error, reason}, so
pipelines compose with |>/2 directly from ExArrow.Stream.from_*/1
constructors (which return {:ok, stream} | {:error, _}). Errors short-
circuit through every stage.
Example
ExArrow.Stream.from_flight_sql(client, "SELECT * FROM events")
|> ExArrow.Pipeline.map_batches(fn batch ->
{:ok, slim} = ExArrow.Batch.select(batch, ["id", "score"])
slim
end)
|> ExArrow.Pipeline.write_parquet("/data/events.parquet")Laziness
map_batches/2 and each_batch/2 return a lazy pipeline — no batches are
consumed until a sink (write_parquet/2, write_flight/3, or
write_dataframe/1) runs. This means a pipeline built but never sunk does
no work.
Telemetry
map_batches/2 and each_batch/2 emit [:ex_arrow, :pipeline, :batch] for
every batch that flows through the stage, carrying rows, columns, and
batch_count measurements and %{source: :pipeline} metadata. The sinks
emit their respective [:ex_arrow, :parquet, :write] and
[:ex_arrow, :flight, :query] events.
Summary
Functions
Run fun for its side effects on each batch without changing the pipeline.
Wrap an ExArrow.Stream.t() (or thread an existing pipeline) and apply
fun to each batch.
Consume the pipeline and convert it into an Explorer.DataFrame.
Consume the pipeline and upload every batch to a Flight server via client.
Consume the pipeline and write every batch to a Parquet file at path.
Types
@type input() :: ExArrow.Stream.t() | t()
@type t() :: %ExArrow.Pipeline{enum: Enumerable.t(), schema: ExArrow.Schema.t() | nil}
Functions
@spec each_batch(threaded(), (ExArrow.RecordBatch.t() -> term())) :: threaded()
Run fun for its side effects on each batch without changing the pipeline.
Lazy: the side effect runs only when a sink consumes the pipeline. The pipeline's batches pass through unchanged.
@spec map_batches(threaded(), (ExArrow.RecordBatch.t() -> term())) :: threaded()
Wrap an ExArrow.Stream.t() (or thread an existing pipeline) and apply
fun to each batch.
The resulting pipeline is lazy: fun runs only when a sink consumes the
pipeline. fun receives an ExArrow.RecordBatch.t() and should return an
ExArrow.RecordBatch.t() (or any term if the downstream sink expects it).
Example
ExArrow.Stream.from_parquet("/data/events.parquet")
|> ExArrow.Pipeline.map_batches(&ExArrow.Batch.schema/1)
@spec write_dataframe(threaded()) :: {:ok, Explorer.DataFrame.t()} | {:error, String.t()}
Consume the pipeline and convert it into an Explorer.DataFrame.
Requires the optional {:explorer, "~> 0.11"} dependency. Returns
{:ok, dataframe} or {:error, message}.
Memory
This sink materialises all batches into a BEAM list before conversion.
See write_parquet/2 for details and alternatives.
@spec write_flight(threaded(), ExArrow.Flight.Client.t(), keyword()) :: :ok | {:error, term()}
Consume the pipeline and upload every batch to a Flight server via client.
opts are forwarded to ExArrow.Flight.Client.do_put/4. Emits a
[:ex_arrow, :flight, :query] telemetry event. Returns :ok or
{:error, reason}.
Memory
This sink materialises all batches into a BEAM list before uploading.
See write_parquet/2 for details and alternatives.
Consume the pipeline and write every batch to a Parquet file at path.
Triggers evaluation of all upstream stages. Emits a
[:ex_arrow, :parquet, :write] telemetry event. Returns :ok or
{:error, message}.
Memory
This sink materialises all batches into a BEAM list before writing.
For very large streams this can be a memory concern. The underlying
ExArrow.Parquet.Writer.to_file/3 currently requires a list; a
streaming write path may be added in a future release. For datasets
that do not fit in memory, iterate with ExArrow.Stream.next/1 and
write batches incrementally.