instrument_span_processor_batch (instrument v1.0.0)

View Source

Batch span processor that queues spans and exports in batches.

This processor collects spans and exports them in batches, providing better performance than the simple processor in production.

Configuration

- exporter: Exporter module (required) - exporter_config: Configuration for the exporter (default: #{}) - max_queue_size: Maximum number of spans in queue (default: 2048) - max_export_batch_size: Maximum spans per export (default: 512) - schedule_delay_millis: Delay between exports in ms (default: 5000) - export_timeout_millis: Export timeout in ms (default: 30000)

Example

  instrument_span_processor:register(instrument_span_processor_batch, #{
    exporter => instrument_exporter_otlp,
    exporter_config => #{endpoint => "http://localhost:4318"},
    max_queue_size => 4096,
    schedule_delay_millis => 1000
  }).

Summary

Functions

Forces an immediate export of all queued spans.

Forces an immediate export with state.

Initializes the batch processor.

Called when a span ends. Queues the span for batch export.

Called when a span starts. Returns the span unchanged.

Shuts down the processor.

Shuts down the processor with state.

Starts the batch processor as a gen_server.

Types

inflight/0

-type inflight() ::
          #{pid := pid(),
            monitor := reference(),
            ref := reference(),
            spans :=
                [#span{name :: binary(),
                       ctx ::
                           #span_ctx{trace_id :: <<_:128>> | undefined,
                                     span_id :: <<_:64>> | undefined,
                                     trace_flags :: 0 | 1,
                                     trace_state :: [{binary(), binary()}],
                                     is_remote :: boolean()},
                       parent_ctx ::
                           #span_ctx{trace_id :: <<_:128>> | undefined,
                                     span_id :: <<_:64>> | undefined,
                                     trace_flags :: 0 | 1,
                                     trace_state :: [{binary(), binary()}],
                                     is_remote :: boolean()} |
                           undefined,
                       tracer ::
                           #tracer{name :: binary(),
                                   version :: binary() | undefined,
                                   schema_url :: binary() | undefined,
                                   resource ::
                                       #resource{attributes :: map(),
                                                 schema_url :: binary() | undefined} |
                                       undefined} |
                           undefined,
                       kind :: client | server | producer | consumer | internal,
                       start_time :: integer(),
                       end_time :: integer() | undefined,
                       attributes :: map(),
                       events ::
                           [#span_event{name :: binary(),
                                        timestamp :: integer(),
                                        attributes :: map(),
                                        dropped_attributes_count :: non_neg_integer()}],
                       links ::
                           [#span_link{ctx ::
                                           #span_ctx{trace_id :: <<_:128>> | undefined,
                                                     span_id :: <<_:64>> | undefined,
                                                     trace_flags :: 0 | 1,
                                                     trace_state :: [{binary(), binary()}],
                                                     is_remote :: boolean()},
                                       attributes :: map(),
                                       dropped_attributes_count :: non_neg_integer()}],
                       status :: unset | ok | {error, binary()},
                       is_recording :: boolean(),
                       dropped_attributes_count :: non_neg_integer(),
                       dropped_events_count :: non_neg_integer(),
                       dropped_links_count :: non_neg_integer()}],
            kill_timer := reference(),
            pending_froms := [gen_server:from()]}.

Functions

code_change(OldVsn, State, Extra)

force_flush()

-spec force_flush() -> ok.

Forces an immediate export of all queued spans.

force_flush(State)

-spec force_flush(#state{exporter :: module(),
                         exporter_state :: term(),
                         max_queue_size :: pos_integer(),
                         max_export_batch_size :: pos_integer(),
                         schedule_delay :: pos_integer(),
                         export_timeout :: pos_integer(),
                         queue ::
                             [#span{name :: binary(),
                                    ctx ::
                                        #span_ctx{trace_id :: <<_:128>> | undefined,
                                                  span_id :: <<_:64>> | undefined,
                                                  trace_flags :: 0 | 1,
                                                  trace_state :: [{binary(), binary()}],
                                                  is_remote :: boolean()},
                                    parent_ctx ::
                                        #span_ctx{trace_id :: <<_:128>> | undefined,
                                                  span_id :: <<_:64>> | undefined,
                                                  trace_flags :: 0 | 1,
                                                  trace_state :: [{binary(), binary()}],
                                                  is_remote :: boolean()} |
                                        undefined,
                                    tracer ::
                                        #tracer{name :: binary(),
                                                version :: binary() | undefined,
                                                schema_url :: binary() | undefined,
                                                resource ::
                                                    #resource{attributes :: map(),
                                                              schema_url :: binary() | undefined} |
                                                    undefined} |
                                        undefined,
                                    kind :: client | server | producer | consumer | internal,
                                    start_time :: integer(),
                                    end_time :: integer() | undefined,
                                    attributes :: map(),
                                    events ::
                                        [#span_event{name :: binary(),
                                                     timestamp :: integer(),
                                                     attributes :: map(),
                                                     dropped_attributes_count :: non_neg_integer()}],
                                    links ::
                                        [#span_link{ctx ::
                                                        #span_ctx{trace_id :: <<_:128>> | undefined,
                                                                  span_id :: <<_:64>> | undefined,
                                                                  trace_flags :: 0 | 1,
                                                                  trace_state :: [{binary(), binary()}],
                                                                  is_remote :: boolean()},
                                                    attributes :: map(),
                                                    dropped_attributes_count :: non_neg_integer()}],
                                    status :: unset | ok | {error, binary()},
                                    is_recording :: boolean(),
                                    dropped_attributes_count :: non_neg_integer(),
                                    dropped_events_count :: non_neg_integer(),
                                    dropped_links_count :: non_neg_integer()}],
                         queue_size :: non_neg_integer(),
                         timer_ref :: reference() | undefined,
                         dropped_spans :: non_neg_integer(),
                         retry_spans ::
                             [#span{name :: binary(),
                                    ctx ::
                                        #span_ctx{trace_id :: <<_:128>> | undefined,
                                                  span_id :: <<_:64>> | undefined,
                                                  trace_flags :: 0 | 1,
                                                  trace_state :: [{binary(), binary()}],
                                                  is_remote :: boolean()},
                                    parent_ctx ::
                                        #span_ctx{trace_id :: <<_:128>> | undefined,
                                                  span_id :: <<_:64>> | undefined,
                                                  trace_flags :: 0 | 1,
                                                  trace_state :: [{binary(), binary()}],
                                                  is_remote :: boolean()} |
                                        undefined,
                                    tracer ::
                                        #tracer{name :: binary(),
                                                version :: binary() | undefined,
                                                schema_url :: binary() | undefined,
                                                resource ::
                                                    #resource{attributes :: map(),
                                                              schema_url :: binary() | undefined} |
                                                    undefined} |
                                        undefined,
                                    kind :: client | server | producer | consumer | internal,
                                    start_time :: integer(),
                                    end_time :: integer() | undefined,
                                    attributes :: map(),
                                    events ::
                                        [#span_event{name :: binary(),
                                                     timestamp :: integer(),
                                                     attributes :: map(),
                                                     dropped_attributes_count :: non_neg_integer()}],
                                    links ::
                                        [#span_link{ctx ::
                                                        #span_ctx{trace_id :: <<_:128>> | undefined,
                                                                  span_id :: <<_:64>> | undefined,
                                                                  trace_flags :: 0 | 1,
                                                                  trace_state :: [{binary(), binary()}],
                                                                  is_remote :: boolean()},
                                                    attributes :: map(),
                                                    dropped_attributes_count :: non_neg_integer()}],
                                    status :: unset | ok | {error, binary()},
                                    is_recording :: boolean(),
                                    dropped_attributes_count :: non_neg_integer(),
                                    dropped_events_count :: non_neg_integer(),
                                    dropped_links_count :: non_neg_integer()}],
                         retry_attempts :: non_neg_integer(),
                         max_batch_retries :: non_neg_integer(),
                         export_inflight :: undefined | inflight()}) ->
                     ok.

Forces an immediate export with state.

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_info(Info, State)

init(Config)

-spec init(map()) -> {ok, term()} | {error, term()}.

Initializes the batch processor.

on_end(Span)

-spec on_end(#span{name :: binary(),
                   ctx ::
                       #span_ctx{trace_id :: <<_:128>> | undefined,
                                 span_id :: <<_:64>> | undefined,
                                 trace_flags :: 0 | 1,
                                 trace_state :: [{binary(), binary()}],
                                 is_remote :: boolean()},
                   parent_ctx ::
                       #span_ctx{trace_id :: <<_:128>> | undefined,
                                 span_id :: <<_:64>> | undefined,
                                 trace_flags :: 0 | 1,
                                 trace_state :: [{binary(), binary()}],
                                 is_remote :: boolean()} |
                       undefined,
                   tracer ::
                       #tracer{name :: binary(),
                               version :: binary() | undefined,
                               schema_url :: binary() | undefined,
                               resource ::
                                   #resource{attributes :: map(), schema_url :: binary() | undefined} |
                                   undefined} |
                       undefined,
                   kind :: client | server | producer | consumer | internal,
                   start_time :: integer(),
                   end_time :: integer() | undefined,
                   attributes :: map(),
                   events ::
                       [#span_event{name :: binary(),
                                    timestamp :: integer(),
                                    attributes :: map(),
                                    dropped_attributes_count :: non_neg_integer()}],
                   links ::
                       [#span_link{ctx ::
                                       #span_ctx{trace_id :: <<_:128>> | undefined,
                                                 span_id :: <<_:64>> | undefined,
                                                 trace_flags :: 0 | 1,
                                                 trace_state :: [{binary(), binary()}],
                                                 is_remote :: boolean()},
                                   attributes :: map(),
                                   dropped_attributes_count :: non_neg_integer()}],
                   status :: unset | ok | {error, binary()},
                   is_recording :: boolean(),
                   dropped_attributes_count :: non_neg_integer(),
                   dropped_events_count :: non_neg_integer(),
                   dropped_links_count :: non_neg_integer()}) ->
                ok.

Called when a span ends. Queues the span for batch export.

on_start(Span, ParentCtx)

-spec on_start(#span{name :: binary(),
                     ctx ::
                         #span_ctx{trace_id :: <<_:128>> | undefined,
                                   span_id :: <<_:64>> | undefined,
                                   trace_flags :: 0 | 1,
                                   trace_state :: [{binary(), binary()}],
                                   is_remote :: boolean()},
                     parent_ctx ::
                         #span_ctx{trace_id :: <<_:128>> | undefined,
                                   span_id :: <<_:64>> | undefined,
                                   trace_flags :: 0 | 1,
                                   trace_state :: [{binary(), binary()}],
                                   is_remote :: boolean()} |
                         undefined,
                     tracer ::
                         #tracer{name :: binary(),
                                 version :: binary() | undefined,
                                 schema_url :: binary() | undefined,
                                 resource ::
                                     #resource{attributes :: map(), schema_url :: binary() | undefined} |
                                     undefined} |
                         undefined,
                     kind :: client | server | producer | consumer | internal,
                     start_time :: integer(),
                     end_time :: integer() | undefined,
                     attributes :: map(),
                     events ::
                         [#span_event{name :: binary(),
                                      timestamp :: integer(),
                                      attributes :: map(),
                                      dropped_attributes_count :: non_neg_integer()}],
                     links ::
                         [#span_link{ctx ::
                                         #span_ctx{trace_id :: <<_:128>> | undefined,
                                                   span_id :: <<_:64>> | undefined,
                                                   trace_flags :: 0 | 1,
                                                   trace_state :: [{binary(), binary()}],
                                                   is_remote :: boolean()},
                                     attributes :: map(),
                                     dropped_attributes_count :: non_neg_integer()}],
                     status :: unset | ok | {error, binary()},
                     is_recording :: boolean(),
                     dropped_attributes_count :: non_neg_integer(),
                     dropped_events_count :: non_neg_integer(),
                     dropped_links_count :: non_neg_integer()},
               #span_ctx{trace_id :: <<_:128>> | undefined,
                         span_id :: <<_:64>> | undefined,
                         trace_flags :: 0 | 1,
                         trace_state :: [{binary(), binary()}],
                         is_remote :: boolean()} |
               undefined) ->
                  #span{name :: binary(),
                        ctx ::
                            #span_ctx{trace_id :: <<_:128>> | undefined,
                                      span_id :: <<_:64>> | undefined,
                                      trace_flags :: 0 | 1,
                                      trace_state :: [{binary(), binary()}],
                                      is_remote :: boolean()},
                        parent_ctx ::
                            #span_ctx{trace_id :: <<_:128>> | undefined,
                                      span_id :: <<_:64>> | undefined,
                                      trace_flags :: 0 | 1,
                                      trace_state :: [{binary(), binary()}],
                                      is_remote :: boolean()} |
                            undefined,
                        tracer ::
                            #tracer{name :: binary(),
                                    version :: binary() | undefined,
                                    schema_url :: binary() | undefined,
                                    resource ::
                                        #resource{attributes :: map(),
                                                  schema_url :: binary() | undefined} |
                                        undefined} |
                            undefined,
                        kind :: client | server | producer | consumer | internal,
                        start_time :: integer(),
                        end_time :: integer() | undefined,
                        attributes :: map(),
                        events ::
                            [#span_event{name :: binary(),
                                         timestamp :: integer(),
                                         attributes :: map(),
                                         dropped_attributes_count :: non_neg_integer()}],
                        links ::
                            [#span_link{ctx ::
                                            #span_ctx{trace_id :: <<_:128>> | undefined,
                                                      span_id :: <<_:64>> | undefined,
                                                      trace_flags :: 0 | 1,
                                                      trace_state :: [{binary(), binary()}],
                                                      is_remote :: boolean()},
                                        attributes :: map(),
                                        dropped_attributes_count :: non_neg_integer()}],
                        status :: unset | ok | {error, binary()},
                        is_recording :: boolean(),
                        dropped_attributes_count :: non_neg_integer(),
                        dropped_events_count :: non_neg_integer(),
                        dropped_links_count :: non_neg_integer()}.

Called when a span starts. Returns the span unchanged.

shutdown()

-spec shutdown() -> ok.

Shuts down the processor.

shutdown(State)

-spec shutdown(#state{exporter :: module(),
                      exporter_state :: term(),
                      max_queue_size :: pos_integer(),
                      max_export_batch_size :: pos_integer(),
                      schedule_delay :: pos_integer(),
                      export_timeout :: pos_integer(),
                      queue ::
                          [#span{name :: binary(),
                                 ctx ::
                                     #span_ctx{trace_id :: <<_:128>> | undefined,
                                               span_id :: <<_:64>> | undefined,
                                               trace_flags :: 0 | 1,
                                               trace_state :: [{binary(), binary()}],
                                               is_remote :: boolean()},
                                 parent_ctx ::
                                     #span_ctx{trace_id :: <<_:128>> | undefined,
                                               span_id :: <<_:64>> | undefined,
                                               trace_flags :: 0 | 1,
                                               trace_state :: [{binary(), binary()}],
                                               is_remote :: boolean()} |
                                     undefined,
                                 tracer ::
                                     #tracer{name :: binary(),
                                             version :: binary() | undefined,
                                             schema_url :: binary() | undefined,
                                             resource ::
                                                 #resource{attributes :: map(),
                                                           schema_url :: binary() | undefined} |
                                                 undefined} |
                                     undefined,
                                 kind :: client | server | producer | consumer | internal,
                                 start_time :: integer(),
                                 end_time :: integer() | undefined,
                                 attributes :: map(),
                                 events ::
                                     [#span_event{name :: binary(),
                                                  timestamp :: integer(),
                                                  attributes :: map(),
                                                  dropped_attributes_count :: non_neg_integer()}],
                                 links ::
                                     [#span_link{ctx ::
                                                     #span_ctx{trace_id :: <<_:128>> | undefined,
                                                               span_id :: <<_:64>> | undefined,
                                                               trace_flags :: 0 | 1,
                                                               trace_state :: [{binary(), binary()}],
                                                               is_remote :: boolean()},
                                                 attributes :: map(),
                                                 dropped_attributes_count :: non_neg_integer()}],
                                 status :: unset | ok | {error, binary()},
                                 is_recording :: boolean(),
                                 dropped_attributes_count :: non_neg_integer(),
                                 dropped_events_count :: non_neg_integer(),
                                 dropped_links_count :: non_neg_integer()}],
                      queue_size :: non_neg_integer(),
                      timer_ref :: reference() | undefined,
                      dropped_spans :: non_neg_integer(),
                      retry_spans ::
                          [#span{name :: binary(),
                                 ctx ::
                                     #span_ctx{trace_id :: <<_:128>> | undefined,
                                               span_id :: <<_:64>> | undefined,
                                               trace_flags :: 0 | 1,
                                               trace_state :: [{binary(), binary()}],
                                               is_remote :: boolean()},
                                 parent_ctx ::
                                     #span_ctx{trace_id :: <<_:128>> | undefined,
                                               span_id :: <<_:64>> | undefined,
                                               trace_flags :: 0 | 1,
                                               trace_state :: [{binary(), binary()}],
                                               is_remote :: boolean()} |
                                     undefined,
                                 tracer ::
                                     #tracer{name :: binary(),
                                             version :: binary() | undefined,
                                             schema_url :: binary() | undefined,
                                             resource ::
                                                 #resource{attributes :: map(),
                                                           schema_url :: binary() | undefined} |
                                                 undefined} |
                                     undefined,
                                 kind :: client | server | producer | consumer | internal,
                                 start_time :: integer(),
                                 end_time :: integer() | undefined,
                                 attributes :: map(),
                                 events ::
                                     [#span_event{name :: binary(),
                                                  timestamp :: integer(),
                                                  attributes :: map(),
                                                  dropped_attributes_count :: non_neg_integer()}],
                                 links ::
                                     [#span_link{ctx ::
                                                     #span_ctx{trace_id :: <<_:128>> | undefined,
                                                               span_id :: <<_:64>> | undefined,
                                                               trace_flags :: 0 | 1,
                                                               trace_state :: [{binary(), binary()}],
                                                               is_remote :: boolean()},
                                                 attributes :: map(),
                                                 dropped_attributes_count :: non_neg_integer()}],
                                 status :: unset | ok | {error, binary()},
                                 is_recording :: boolean(),
                                 dropped_attributes_count :: non_neg_integer(),
                                 dropped_events_count :: non_neg_integer(),
                                 dropped_links_count :: non_neg_integer()}],
                      retry_attempts :: non_neg_integer(),
                      max_batch_retries :: non_neg_integer(),
                      export_inflight :: undefined | inflight()}) ->
                  ok.

Shuts down the processor with state.

start_link(Config)

-spec start_link(map()) -> {ok, pid()} | ignore | {error, term()}.

Starts the batch processor as a gen_server.

terminate(Reason, State)