instrument_span_processor_batch (instrument v1.0.0)
View SourceBatch span processor that queues spans and exports in batches.
This processor collects spans and exports them in batches, providing better performance than the simple processor in production.
Configuration
- exporter: Exporter module (required) - exporter_config: Configuration for the exporter (default: #{}) - max_queue_size: Maximum number of spans in queue (default: 2048) - max_export_batch_size: Maximum spans per export (default: 512) - schedule_delay_millis: Delay between exports in ms (default: 5000) - export_timeout_millis: Export timeout in ms (default: 30000)
Example
instrument_span_processor:register(instrument_span_processor_batch, #{
exporter => instrument_exporter_otlp,
exporter_config => #{endpoint => "http://localhost:4318"},
max_queue_size => 4096,
schedule_delay_millis => 1000
}).
Summary
Functions
Forces an immediate export of all queued spans.
Forces an immediate export with state.
Initializes the batch processor.
Called when a span ends. Queues the span for batch export.
Called when a span starts. Returns the span unchanged.
Shuts down the processor.
Shuts down the processor with state.
Starts the batch processor as a gen_server.
Types
-type inflight() :: #{pid := pid(), monitor := reference(), ref := reference(), spans := [#span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}], kill_timer := reference(), pending_froms := [gen_server:from()]}.
Functions
-spec force_flush() -> ok.
Forces an immediate export of all queued spans.
-spec force_flush(#state{exporter :: module(), exporter_state :: term(), max_queue_size :: pos_integer(), max_export_batch_size :: pos_integer(), schedule_delay :: pos_integer(), export_timeout :: pos_integer(), queue :: [#span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}], queue_size :: non_neg_integer(), timer_ref :: reference() | undefined, dropped_spans :: non_neg_integer(), retry_spans :: [#span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}], retry_attempts :: non_neg_integer(), max_batch_retries :: non_neg_integer(), export_inflight :: undefined | inflight()}) -> ok.
Forces an immediate export with state.
Initializes the batch processor.
-spec on_end(#span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}) -> ok.
Called when a span ends. Queues the span for batch export.
-spec on_start(#span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}, #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined) -> #span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}.
Called when a span starts. Returns the span unchanged.
-spec shutdown() -> ok.
Shuts down the processor.
-spec shutdown(#state{exporter :: module(), exporter_state :: term(), max_queue_size :: pos_integer(), max_export_batch_size :: pos_integer(), schedule_delay :: pos_integer(), export_timeout :: pos_integer(), queue :: [#span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}], queue_size :: non_neg_integer(), timer_ref :: reference() | undefined, dropped_spans :: non_neg_integer(), retry_spans :: [#span{name :: binary(), ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, parent_ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()} | undefined, tracer :: #tracer{name :: binary(), version :: binary() | undefined, schema_url :: binary() | undefined, resource :: #resource{attributes :: map(), schema_url :: binary() | undefined} | undefined} | undefined, kind :: client | server | producer | consumer | internal, start_time :: integer(), end_time :: integer() | undefined, attributes :: map(), events :: [#span_event{name :: binary(), timestamp :: integer(), attributes :: map(), dropped_attributes_count :: non_neg_integer()}], links :: [#span_link{ctx :: #span_ctx{trace_id :: <<_:128>> | undefined, span_id :: <<_:64>> | undefined, trace_flags :: 0 | 1, trace_state :: [{binary(), binary()}], is_remote :: boolean()}, attributes :: map(), dropped_attributes_count :: non_neg_integer()}], status :: unset | ok | {error, binary()}, is_recording :: boolean(), dropped_attributes_count :: non_neg_integer(), dropped_events_count :: non_neg_integer(), dropped_links_count :: non_neg_integer()}], retry_attempts :: non_neg_integer(), max_batch_retries :: non_neg_integer(), export_inflight :: undefined | inflight()}) -> ok.
Shuts down the processor with state.
Starts the batch processor as a gen_server.