Batching LogRecordProcessor
(logs/sdk.md §Batching processor L528-L548).
Spec L530-L532 — "creates batches of LogRecords and passes
the export-friendly ReadableLogRecord representations to the
configured LogRecordExporter." Exports are triggered by:
- Queue size threshold (
max_export_batch_size) — a cast that pushes the queue past the threshold transitions immediately to:exporting. - Scheduled timer (
scheduled_delay_ms) — periodic:export_timerinfo while in:idletriggers an export if the queue is non-empty. force_flush/2— synchronous drain of the entire queue followed by the exporter'sforce_flush/1(spec §LogRecordProcessor L484-L486 MUST).shutdown/2— synchronous drain plus exporter'sforce_flush/1thenshutdown/1(spec L469 MUST that "Shutdown MUST include the effects of ForceFlush"). The gen_statem then exits via{:stop_and_reply, :normal, ...}, mirroring the Simple processor pattern (PR #292).
Spec L534-L535 — "The processor MUST synchronize calls to
LogRecordExporter's Export to make sure that they are
not invoked concurrently." — the gen_statem's mailbox
serialises events, and only the :exporting state owns a
runner process. The force_flush / shutdown paths drain
the queue from inside the gen_statem after the runner has
cleared, so no two export/2 calls overlap.
State model
Two :gen_statem states with :state_enter callbacks:
:idle— accepts cast{:add_record, _}, periodic:export_timer, and synchronous{:force_flush, deadline}/{:shutdown, deadline}calls. Transitions to:exportingwhen the queue threshold is met or the periodic timer fires with a non-empty queue.:exporting— the:entercallback spawns a runner process (spawn_monitor) that calls the exporter'sexport/2. A:state_timeoutofexport_timeout_msbounds the export — on expiry, the runner is killed (spec L544-L545 "how long the export can run before it is cancelled"). Cast{:add_record, _}continues to enqueue during export (no postpone — back-pressure stays atmax_queue_size); the first{:force_flush, deadline}/{:shutdown, deadline}is saved aspending_calland a:pending_deadlinegeneric timeout is armed. When the runner finishes (or that timeout fires) we either run the drain inside the caller's remaining budget or abort and reply{:error, :timeout}— spec §LogRecordProcessor L487-L491 "MUST prioritize honoring the timeout over finishing all calls. It MAY skip or abort some or all Export or ForceFlush calls". Subsequentforce_flush/shutdowncalls postpone, are replayed in:idle, and each carries its own absolute deadline.
No child_spec/1 is exposed — the LoggerProvider is the
only supervisor for this processor and it calls
start_link/1 directly. Users who want to put the processor
under their own Supervisor can write a one-line spec inline.
Drop reporting
When the queue is full at emit time the new record is
dropped (spec L540-L541 "After the size is reached logs
are dropped"). Drops are silently allowed by spec, but to
give operators visibility into sustained back-pressure we
count them on the state and surface a throttled
Logger.warning with the running total on every
:export_timer tick (i.e. once per scheduled_delay_ms,
default 1000ms). terminate/3 flushes the final tally so
no drops are lost across shutdown.
Design notes
force_flush/2 and shutdown/2 use :gen_statem.call (not
:cast) to surface the result back to the caller. Spec
§LogRecordProcessor L466-L467 / L492-L493 SHOULD provide a
way to let the caller know whether the call succeeded,
failed, or timed out.
opentelemetry-erlang does not have a separate "batch log
processor"; the spec gap noted above refers to erlang's
span batch processor
(apps/opentelemetry/src/otel_batch_processor.erl), which
uses gen_statem:cast for force_flush and silently drops
the result. That's a span-side observation, not a logs
reference. Our logs implementation follows the Logs SDK
spec MUST/SHOULDs directly.
Public API
| Function | Role |
|---|---|
on_emit/3, enabled?/3, shutdown/2, force_flush/2 | SDK (Batch implementation) |
start_link/1 | SDK (lifecycle) |
References
- OTel Logs SDK Batching processor:
opentelemetry-specification/specification/logs/sdk.md§Batching processor - Parent behaviour:
Otel.SDK.Logs.LogRecordProcessor
Summary
Types
start_link/1 configuration map. All keys except :exporter
default to the spec-defined values from §Batching processor
L540-L547.
Functions
SDK (Batch implementation) — Always returns true; the
Batch processor has no filtering policy of its own
(logs/sdk.md §LogRecordProcessor L420 "MAY implement").
SDK (Batch implementation) — Drain the queue and invoke
the exporter's force_flush/1 (spec §LogRecordProcessor
L484-L486 MUST). When the gen_statem is currently in
:exporting, the call waits for the runner to clear; if the
caller's deadline elapses first the runner is aborted (spec
L487-L491 MAY).
SDK (Batch implementation) — Enqueue the record via
:gen_statem.cast/2 (non-blocking, per spec
§LogRecordProcessor L394-L396 "called synchronously on the
thread that emitted the LogRecord, therefore it SHOULD NOT
block or throw exceptions"). Triggers an immediate
transition to :exporting when the queue reaches
max_export_batch_size.
SDK (Batch implementation) — Drain the queue, invoke the
exporter's force_flush/1 then shutdown/1, and exit the
gen_statem.
Types
@type start_link_config() :: %{ :exporter => {module(), term()}, optional(:max_queue_size) => pos_integer(), optional(:scheduled_delay_ms) => non_neg_integer(), optional(:export_timeout_ms) => non_neg_integer(), optional(:max_export_batch_size) => pos_integer() }
start_link/1 configuration map. All keys except :exporter
default to the spec-defined values from §Batching processor
L540-L547.
:exporter(required) —{module, opts};optsis passed tomodule.init/1once at startup.:max_queue_size(optional) — spec L540-L541, default 2048.:scheduled_delay_ms(optional) — spec L542-L543, default 1000.:export_timeout_ms(optional) — spec L544-L545, default 30000.:max_export_batch_size(optional) — spec L546-L547, default 512. MUST be ≤max_queue_size.
Functions
@spec enabled?( ctx :: Otel.API.Ctx.t(), scope :: Otel.API.InstrumentationScope.t(), opts :: Otel.SDK.Logs.LogRecordProcessor.enabled_opts(), config :: Otel.SDK.Logs.LogRecordProcessor.config() ) :: boolean()
SDK (Batch implementation) — Always returns true; the
Batch processor has no filtering policy of its own
(logs/sdk.md §LogRecordProcessor L420 "MAY implement").
@spec exporting( event_type :: :gen_statem.event_type(), event_content :: :idle | :exporting | {:add_record, Otel.SDK.Logs.LogRecord.t()} | {:force_flush | :shutdown, integer() | :infinity} | :export_timer | :export_timeout | :pending_deadline | {:export_done, pid()} | {:DOWN, reference(), :process, pid(), term()}, state :: Otel.SDK.Logs.LogRecordProcessor.Batch.State.t() ) :: :gen_statem.event_handler_result( Otel.SDK.Logs.LogRecordProcessor.Batch.State.t() )
@spec force_flush( config :: Otel.SDK.Logs.LogRecordProcessor.config(), timeout :: timeout() ) :: :ok | {:error, term()}
SDK (Batch implementation) — Drain the queue and invoke
the exporter's force_flush/1 (spec §LogRecordProcessor
L484-L486 MUST). When the gen_statem is currently in
:exporting, the call waits for the runner to clear; if the
caller's deadline elapses first the runner is aborted (spec
L487-L491 MAY).
timeout (default 30_000ms) is converted to an absolute
deadline. Drain and exporter force_flush/1 are gated on it
per spec L487-L491; on expiry, returns {:error, :timeout}
per L492-L493. Returns {:error, :already_shutdown} when
the gen_statem has already terminated — spec L492-L493
classifies this as failed.
@spec idle( event_type :: :gen_statem.event_type(), event_content :: :idle | :exporting | {:add_record, Otel.SDK.Logs.LogRecord.t()} | {:force_flush | :shutdown, integer() | :infinity} | :export_timer | :pending_deadline | {:export_done, pid()} | {:DOWN, reference(), :process, pid(), term()}, state :: Otel.SDK.Logs.LogRecordProcessor.Batch.State.t() ) :: :gen_statem.event_handler_result( Otel.SDK.Logs.LogRecordProcessor.Batch.State.t() )
@spec on_emit( log_record :: Otel.SDK.Logs.LogRecord.t(), ctx :: Otel.API.Ctx.t(), config :: Otel.SDK.Logs.LogRecordProcessor.config() ) :: :ok
SDK (Batch implementation) — Enqueue the record via
:gen_statem.cast/2 (non-blocking, per spec
§LogRecordProcessor L394-L396 "called synchronously on the
thread that emitted the LogRecord, therefore it SHOULD NOT
block or throw exceptions"). Triggers an immediate
transition to :exporting when the queue reaches
max_export_batch_size.
@spec shutdown( config :: Otel.SDK.Logs.LogRecordProcessor.config(), timeout :: timeout() ) :: :ok | {:error, term()}
SDK (Batch implementation) — Drain the queue, invoke the
exporter's force_flush/1 then shutdown/1, and exit the
gen_statem.
timeout (default 30_000ms, matching spec
OTEL_BLRP_EXPORT_TIMEOUT) is converted to an absolute
deadline and forwarded to the gen_statem. Per spec
§LogRecordProcessor L487-L491 the processor MUST honor that
deadline over finishing all calls — drain, exporter
force_flush/1, and exporter shutdown/1 are each gated on
the deadline; if any step would exceed it, the rest are
skipped and {:error, :timeout} is returned (per L466-L467).
Returns {:error, :already_shutdown} when the gen_statem has
already terminated — spec L466-L467 classifies this as failed
rather than silently succeeded.
@spec start_link(config :: start_link_config()) :: :gen_statem.start_ret()