ExArrow.Broadway (ex_arrow v0.7.1)

View Source

Arrow-native Broadway ingestion pipelines.

Broadway is the standard Elixir library for ingestion (Kafka, SQS, S3, ...). ExArrow.Broadway provides the pieces needed to keep Broadway pipelines Arrow-native: a batch builder that assembles ExArrow.RecordBatch values from incoming messages, and Parquet/Flight sinks that write assembled batches downstream.

Requires {:broadway, "~> 1.0"} in your mix.exs dependencies.

Architecture

Kafka / SQS / S3
     (messages carry Arrow columnar payloads)
    
Broadway producer
    
    
Broadway processor     ExArrow.Broadway.BatchBuilder
                           (groups messages into RecordBatch values)
    
Broadway batcher       batch_size / batch_timeout config
    
    
handle_batch/4         ExArrow.Broadway.ParquetSink / FlightSink

The unit that flows through the pipeline is an ExArrow.RecordBatch handle, not a row map. Producers are expected to deliver messages whose data is either a ExArrow.RecordBatch handle (e.g. from an Arrow-aware Kafka deserialiser) or a {names, binaries, dtypes, length} tuple describing raw Arrow columns (see BatchBuilder.from_messages/2).

Tuning

Batch sizing and flush intervals are controlled by the Broadway batcher configuration (:batch_size, :batch_timeout), not by ExArrow. ExArrow's BatchBuilder honours those boundaries and additionally can split an incoming batch into smaller Arrow batches via the :rows_per_batch option.

Example pipeline

defmodule MyPipeline do
  use Broadway

  def start_link(opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [module: {MyKafkaProducer, opts}],
      processors: [default: [concurrency: 4]],
      batchers: [
        parquet: [concurrency: 2, batch_size: 100, batch_timeout: 1000]
      ]
    )
  end

  def handle_message(:default, %Broadway.Message{data: batch} = msg, _ctx)
      when is_struct(batch, ExArrow.RecordBatch) do
    Broadway.Message.put_batcher(msg, :parquet)
  end

  def handle_batch(:parquet, messages, _batch_info, _ctx) do
    {:ok, schema, batches} =
      ExArrow.Broadway.BatchBuilder.from_messages(messages)

    ExArrow.Broadway.ParquetSink.write(
      "/data/events.parquet",
      schema,
      batches
    )
  end
end

Summary

Functions

Returns true if the :broadway dependency is loaded.

Functions

broadway_available?()

@spec broadway_available?() :: boolean()

Returns true if the :broadway dependency is loaded.