ExArrow. GenStage
(ex_arrow v0.7.0)
View Source
Arrow-native GenStage producers.
ExArrow provides three demand-driven producers that emit
ExArrow.RecordBatch values from the common ExArrow sources:
| Producer | Source |
|---|---|
ExArrow.GenStage.ParquetProducer | Parquet file or binary |
ExArrow.GenStage.FlightProducer | Flight do_get ticket |
ExArrow.GenStage.ADBCProducer | ADBC statement / {connection, sql} |
All three share the same lifecycle:
- Demand-driven: batches are only read when a consumer demands them, so slow consumers apply backpressure all the way to the source.
- Arrow batch delivery: each emitted event is an
ExArrow.RecordBatchhandle (an opaque reference to native memory) — no row maps. - Clean shutdown: when the underlying stream is exhausted the producer
drains remaining batches, sends itself a stop message, and exits with
reason
:normal. - Resource cleanup: the stream handle is released when the producer
terminates;
terminate/2is implemented so the stream is closed even on non-normal shutdown.
Requires {:gen_stage, "~> 1.2"} in your mix.exs dependencies.
Wiring examples
Producer + consumer
{:ok, prod} =
ExArrow.GenStage.ParquetProducer.start_link(path: "/data/events.parquet")
# A minimal consumer that collects batches into the calling process.
defmodule Collector do
use GenStage
def init(pid), do: {:consumer, pid}
def handle_events(batches, _from, pid) do
send(pid, {:batches, batches})
{:noreply, [], pid}
end
end
{:ok, cons} = GenStage.start_link(Collector, self())
GenStage.sync_subscribe(cons, to: prod, max_demand: 4)Producer-consumer
Wrap a producer with a producer-consumer that transforms each batch (e.g.
via ExArrow.Batch.select/2) before forwarding it downstream.
defmodule MyTransformer do
use GenStage
def init(state), do: {:producer_consumer, state}
def handle_events(batches, _from, state) do
transformed =
Enum.map(batches, fn batch ->
{:ok, slim} = ExArrow.Batch.select(batch, ["id"])
slim
end)
{:noreply, transformed, state}
end
end