ExArrow. Broadway
(ex_arrow v0.7.1)
View Source
Arrow-native Broadway ingestion pipelines.
Broadway is the standard Elixir library for ingestion (Kafka, SQS, S3, ...).
ExArrow.Broadway provides the pieces needed to keep Broadway pipelines
Arrow-native: a batch builder that assembles ExArrow.RecordBatch values
from incoming messages, and Parquet/Flight sinks that write assembled
batches downstream.
Requires {:broadway, "~> 1.0"} in your mix.exs dependencies.
Architecture
Kafka / SQS / S3
│ (messages carry Arrow columnar payloads)
▼
Broadway producer
│
▼
Broadway processor ──► ExArrow.Broadway.BatchBuilder
│ (groups messages into RecordBatch values)
▼
Broadway batcher ──► batch_size / batch_timeout config
│
▼
handle_batch/4 ──► ExArrow.Broadway.ParquetSink / FlightSinkThe unit that flows through the pipeline is an ExArrow.RecordBatch handle,
not a row map. Producers are expected to deliver messages whose data is
either a ExArrow.RecordBatch handle (e.g. from an Arrow-aware Kafka
deserialiser) or a {names, binaries, dtypes, length} tuple describing raw
Arrow columns (see BatchBuilder.from_messages/2).
Tuning
Batch sizing and flush intervals are controlled by the Broadway batcher
configuration (:batch_size, :batch_timeout), not by ExArrow. ExArrow's
BatchBuilder honours those boundaries and additionally can split an
incoming batch into smaller Arrow batches via the :rows_per_batch option.
Example pipeline
defmodule MyPipeline do
use Broadway
def start_link(opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [module: {MyKafkaProducer, opts}],
processors: [default: [concurrency: 4]],
batchers: [
parquet: [concurrency: 2, batch_size: 100, batch_timeout: 1000]
]
)
end
def handle_message(:default, %Broadway.Message{data: batch} = msg, _ctx)
when is_struct(batch, ExArrow.RecordBatch) do
Broadway.Message.put_batcher(msg, :parquet)
end
def handle_batch(:parquet, messages, _batch_info, _ctx) do
{:ok, schema, batches} =
ExArrow.Broadway.BatchBuilder.from_messages(messages)
ExArrow.Broadway.ParquetSink.write(
"/data/events.parquet",
schema,
batches
)
end
end
Summary
Functions
Returns true if the :broadway dependency is loaded.
Functions
@spec broadway_available?() :: boolean()
Returns true if the :broadway dependency is loaded.