ExArrow.GenStage (ex_arrow v0.7.1)

View Source

Arrow-native GenStage producers.

ExArrow provides three demand-driven producers that emit ExArrow.RecordBatch values from the common ExArrow sources:

ProducerSource
ExArrow.GenStage.ParquetProducerParquet file or binary
ExArrow.GenStage.FlightProducerFlight do_get ticket
ExArrow.GenStage.ADBCProducerADBC statement / {connection, sql}

All three share the same lifecycle:

  • Demand-driven: batches are only read when a consumer demands them, so slow consumers apply backpressure all the way to the source.
  • Arrow batch delivery: each emitted event is an ExArrow.RecordBatch handle (an opaque reference to native memory) — no row maps.
  • Clean shutdown: when the underlying stream is exhausted the producer drains remaining batches, sends itself a stop message, and exits with reason :normal.
  • Resource cleanup: the stream handle is released when the producer terminates; terminate/2 is implemented so the stream is closed even on non-normal shutdown.

Requires {:gen_stage, "~> 1.2"} in your mix.exs dependencies.

Wiring examples

Producer + consumer

{:ok, prod} =
  ExArrow.GenStage.ParquetProducer.start_link(path: "/data/events.parquet")

# A minimal consumer that collects batches into the calling process.
defmodule Collector do
  use GenStage

  def init(pid), do: {:consumer, pid}

  def handle_events(batches, _from, pid) do
    send(pid, {:batches, batches})
    {:noreply, [], pid}
  end
end

{:ok, cons} = GenStage.start_link(Collector, self())
GenStage.sync_subscribe(cons, to: prod, max_demand: 4)

Producer-consumer

Wrap a producer with a producer-consumer that transforms each batch (e.g. via ExArrow.Batch.select/2) before forwarding it downstream.

defmodule MyTransformer do
  use GenStage

  def init(state), do: {:producer_consumer, state}

  def handle_events(batches, _from, state) do
    transformed =
      Enum.map(batches, fn batch ->
        {:ok, slim} = ExArrow.Batch.select(batch, ["id"])
        slim
      end)

    {:noreply, transformed, state}
  end
end