Otel.SDK.Logs.LogRecordProcessor.Batch (otel v0.2.0)

Copy Markdown View Source

Batching LogRecordProcessor (logs/sdk.md §Batching processor L528-L548).

Spec L530-L532 — "creates batches of LogRecords and passes the export-friendly ReadableLogRecord representations to the configured LogRecordExporter." Exports are triggered by:

  • Queue size threshold (max_export_batch_size) — a cast that pushes the queue past the threshold transitions immediately to :exporting.
  • Scheduled timer (scheduled_delay_ms) — periodic :export_timer info while in :idle triggers an export if the queue is non-empty.
  • force_flush/2 — synchronous drain of the entire queue followed by the exporter's force_flush/1 (spec §LogRecordProcessor L484-L486 MUST).
  • shutdown/2 — synchronous drain plus exporter's force_flush/1 then shutdown/1 (spec L469 MUST that "Shutdown MUST include the effects of ForceFlush"). The gen_statem then exits via {:stop_and_reply, :normal, ...}, mirroring the Simple processor pattern (PR #292).

Spec L534-L535 — "The processor MUST synchronize calls to LogRecordExporter's Export to make sure that they are not invoked concurrently." — the gen_statem's mailbox serialises events, and only the :exporting state owns a runner process. The force_flush / shutdown paths drain the queue from inside the gen_statem after the runner has cleared, so no two export/2 calls overlap.

State model

Two :gen_statem states with :state_enter callbacks:

  • :idle — accepts cast {:add_record, _}, periodic :export_timer, and synchronous {:force_flush, deadline} / {:shutdown, deadline} calls. Transitions to :exporting when the queue threshold is met or the periodic timer fires with a non-empty queue.
  • :exporting — the :enter callback spawns a runner process (spawn_monitor) that calls the exporter's export/2. A :state_timeout of export_timeout_ms bounds the export — on expiry, the runner is killed (spec L544-L545 "how long the export can run before it is cancelled"). Cast {:add_record, _} continues to enqueue during export (no postpone — back-pressure stays at max_queue_size); the first {:force_flush, deadline} / {:shutdown, deadline} is saved as pending_call and a :pending_deadline generic timeout is armed. When the runner finishes (or that timeout fires) we either run the drain inside the caller's remaining budget or abort and reply {:error, :timeout} — spec §LogRecordProcessor L487-L491 "MUST prioritize honoring the timeout over finishing all calls. It MAY skip or abort some or all Export or ForceFlush calls". Subsequent force_flush / shutdown calls postpone, are replayed in :idle, and each carries its own absolute deadline.

No child_spec/1 is exposed — the LoggerProvider is the only supervisor for this processor and it calls start_link/1 directly. Users who want to put the processor under their own Supervisor can write a one-line spec inline.

Drop reporting

When the queue is full at emit time the new record is dropped (spec L540-L541 "After the size is reached logs are dropped"). Drops are silently allowed by spec, but to give operators visibility into sustained back-pressure we count them on the state and surface a throttled Logger.warning with the running total on every :export_timer tick (i.e. once per scheduled_delay_ms, default 1000ms). terminate/3 flushes the final tally so no drops are lost across shutdown.

Design notes

force_flush/2 and shutdown/2 use :gen_statem.call (not :cast) to surface the result back to the caller. Spec §LogRecordProcessor L466-L467 / L492-L493 SHOULD provide a way to let the caller know whether the call succeeded, failed, or timed out.

opentelemetry-erlang does not have a separate "batch log processor"; the spec gap noted above refers to erlang's span batch processor (apps/opentelemetry/src/otel_batch_processor.erl), which uses gen_statem:cast for force_flush and silently drops the result. That's a span-side observation, not a logs reference. Our logs implementation follows the Logs SDK spec MUST/SHOULDs directly.

Public API

FunctionRole
on_emit/3, enabled?/3, shutdown/2, force_flush/2SDK (Batch implementation)
start_link/1SDK (lifecycle)

References

Summary

Types

start_link/1 configuration map. All keys except :exporter default to the spec-defined values from §Batching processor L540-L547.

Functions

SDK (Batch implementation) — Always returns true; the Batch processor has no filtering policy of its own (logs/sdk.md §LogRecordProcessor L420 "MAY implement").

SDK (Batch implementation) — Drain the queue and invoke the exporter's force_flush/1 (spec §LogRecordProcessor L484-L486 MUST). When the gen_statem is currently in :exporting, the call waits for the runner to clear; if the caller's deadline elapses first the runner is aborted (spec L487-L491 MAY).

SDK (Batch implementation) — Enqueue the record via :gen_statem.cast/2 (non-blocking, per spec §LogRecordProcessor L394-L396 "called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions"). Triggers an immediate transition to :exporting when the queue reaches max_export_batch_size.

SDK (Batch implementation) — Drain the queue, invoke the exporter's force_flush/1 then shutdown/1, and exit the gen_statem.

Types

Functions

enabled?(ctx, scope, opts, config)

SDK (Batch implementation) — Always returns true; the Batch processor has no filtering policy of its own (logs/sdk.md §LogRecordProcessor L420 "MAY implement").

exporting(arg1, arg2, state)

@spec exporting(
  event_type :: :gen_statem.event_type(),
  event_content ::
    :idle
    | :exporting
    | {:add_record, Otel.SDK.Logs.LogRecord.t()}
    | {:force_flush | :shutdown, integer() | :infinity}
    | :export_timer
    | :export_timeout
    | :pending_deadline
    | {:export_done, pid()}
    | {:DOWN, reference(), :process, pid(), term()},
  state :: Otel.SDK.Logs.LogRecordProcessor.Batch.State.t()
) ::
  :gen_statem.event_handler_result(
    Otel.SDK.Logs.LogRecordProcessor.Batch.State.t()
  )

force_flush(map, timeout \\ 30000)

@spec force_flush(
  config :: Otel.SDK.Logs.LogRecordProcessor.config(),
  timeout :: timeout()
) :: :ok | {:error, term()}

SDK (Batch implementation) — Drain the queue and invoke the exporter's force_flush/1 (spec §LogRecordProcessor L484-L486 MUST). When the gen_statem is currently in :exporting, the call waits for the runner to clear; if the caller's deadline elapses first the runner is aborted (spec L487-L491 MAY).

timeout (default 30_000ms) is converted to an absolute deadline. Drain and exporter force_flush/1 are gated on it per spec L487-L491; on expiry, returns {:error, :timeout} per L492-L493. Returns {:error, :already_shutdown} when the gen_statem has already terminated — spec L492-L493 classifies this as failed.

idle(arg1, arg2, state)

@spec idle(
  event_type :: :gen_statem.event_type(),
  event_content ::
    :idle
    | :exporting
    | {:add_record, Otel.SDK.Logs.LogRecord.t()}
    | {:force_flush | :shutdown, integer() | :infinity}
    | :export_timer
    | :pending_deadline
    | {:export_done, pid()}
    | {:DOWN, reference(), :process, pid(), term()},
  state :: Otel.SDK.Logs.LogRecordProcessor.Batch.State.t()
) ::
  :gen_statem.event_handler_result(
    Otel.SDK.Logs.LogRecordProcessor.Batch.State.t()
  )

on_emit(log_record, ctx, map)

@spec on_emit(
  log_record :: Otel.SDK.Logs.LogRecord.t(),
  ctx :: Otel.API.Ctx.t(),
  config :: Otel.SDK.Logs.LogRecordProcessor.config()
) :: :ok

SDK (Batch implementation) — Enqueue the record via :gen_statem.cast/2 (non-blocking, per spec §LogRecordProcessor L394-L396 "called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions"). Triggers an immediate transition to :exporting when the queue reaches max_export_batch_size.

shutdown(map, timeout \\ 30000)

@spec shutdown(
  config :: Otel.SDK.Logs.LogRecordProcessor.config(),
  timeout :: timeout()
) :: :ok | {:error, term()}

SDK (Batch implementation) — Drain the queue, invoke the exporter's force_flush/1 then shutdown/1, and exit the gen_statem.

timeout (default 30_000ms, matching spec OTEL_BLRP_EXPORT_TIMEOUT) is converted to an absolute deadline and forwarded to the gen_statem. Per spec §LogRecordProcessor L487-L491 the processor MUST honor that deadline over finishing all calls — drain, exporter force_flush/1, and exporter shutdown/1 are each gated on the deadline; if any step would exceed it, the rest are skipped and {:error, :timeout} is returned (per L466-L467). Returns {:error, :already_shutdown} when the gen_statem has already terminated — spec L466-L467 classifies this as failed rather than silently succeeded.

start_link(config)

@spec start_link(config :: start_link_config()) :: :gen_statem.start_ret()