ExArrow.Telemetry (ex_arrow v0.7.1)

View Source

Telemetry integration for ExArrow.

ExArrow emits structured telemetry events for every transport and pipeline operation. Events follow the [:ex_arrow, component, action] naming convention and carry a consistent set of measurements and metadata so a single handler can observe the whole system.

This module is the single emission point used by ExArrow.Stream, ExArrow.Pipeline, ExArrow.Flow, and the GenStage / Broadway integrations. Application code should attach handlers with :telemetry.attach/4 (or telemetry_metrics) rather than calling execute/3 directly.

Optional dependency

Telemetry is an optional dependency. Add {:telemetry, "~> 1.0"} to your mix.exs to receive events. When the library is absent execute/3 and span/3 degrade to no-ops, so ExArrow itself never crashes because of a missing handler.

Events

EventWhen it is emitted
[:ex_arrow, :flight, :query]A Flight do_get stream is opened
[:ex_arrow, :flight_sql, :query]A Flight SQL query stream is opened
[:ex_arrow, :parquet, :read]A Parquet reader stream is opened
[:ex_arrow, :parquet, :write]Batches are written to Parquet
[:ex_arrow, :stream, :batch]A single batch is yielded from a stream
[:ex_arrow, :pipeline, :batch]A pipeline stage processes a batch

Measurements

Every event may carry any subset of the following measurements. Specific events populate the subset that is meaningful for the operation.

MeasurementTypeMeaning
:rowsnon_neg_integerRows in the batch
:columnsnon_neg_integerColumns in the batch
:durationnon_neg_integerNative monotonic time in nanoseconds
:batch_countnon_neg_integerNumber of batches in a batched operation

Metadata

FieldTypeMeaning
:sourceterm()Origin of the data (path, URI, SQL)
:destinationterm()Target of a write (path, ticket, host)
:schematerm()ExArrow.Schema handle when available
:driverString.t()Driver name (e.g. "adbc_driver_sqlite")

Attaching a handler

:telemetry.attach(
  "ex-arrow-logger",
  [:ex_arrow, :stream, :batch],
  fn _event, measurements, metadata, _config ->
    rows = measurements[:rows]
    source = inspect(metadata[:source])
    IO.puts("batch: " <> Integer.to_string(rows) <> " rows from " <> source)
  end,
  nil
)

{:ok, stream} = ExArrow.Stream.from_parquet("/data/events.parquet")
Enum.each(stream, fn _batch -> :ok end)

Summary

Functions

Build a measurements map for a single batch.

Wrap fun in a telemetry span under the given event name.

Types

event_name()

@type event_name() :: [atom(), ...]

measurements()

@type measurements() :: map()

metadata()

@type metadata() :: map()

Functions

batch_measurements(batch, extra \\ [])

@spec batch_measurements(
  ExArrow.RecordBatch.t(),
  keyword()
) :: measurements()

Build a measurements map for a single batch.

Convenience used by ExArrow.Stream and ExArrow.Pipeline so every emitter reports the same shape of measurements for a batch.

execute(event_name, measurements, metadata)

@spec execute(event_name(), measurements(), metadata()) :: :ok

Emit a telemetry event.

No-ops when a handler is not attached or when the :telemetry application is not running. measurements and metadata are passed straight through to the handler.

Example

ExArrow.Telemetry.execute(
  [:ex_arrow, :stream, :batch],
  %{rows: 100, columns: 3},
  %{source: "/data/events.parquet"}
)

span(event_name, start_metadata, fun)

@spec span(event_name(), metadata(), (-> {term(), metadata()})) :: term()

Wrap fun in a telemetry span under the given event name.

Emits event ++ [:start] and event ++ [:stop] (or event ++ [:exception]) events, matching the convention used by :telemetry.span/3. Returns the result of fun. The start metadata is merged into the stop metadata.

Example

ExArrow.Telemetry.span([:ex_arrow, :flight, :query], %{source: sql}, fn ->
  ExArrow.Flight.Client.do_get(client, ticket)
end)