erldns_pipeline behaviour (erldns v10.5.4)

View Source

It declares a pipeline of sequential transformations to apply to an incoming query until a response is constructed.

This module is responsible for handling the pipeline of pipes that will be executed when a DNS message is received. Handlers in this pipeline will be executed sequentially, accumulating the result of each handler and passing it to the next. This designs a pluggable framework where new behaviour can be injected as a new pipe handler in the right order.

Default pipes

The following are enabled by default, see their documentation for details:

Types of pipelines

There are two kind of pipes: function pipes and module pipes.

Function pipes

A function pipe is by definition any function that receives a dns:message/0 and a set of opts/0 options and returns a dns:message/0. Function pipes must have the following type signature:

-type pipefun() :: fun((dns:message(), erldns_pipeline:opts()) -> erldns_pipeline:return())

Module pipes

The preferred mechanism, a module pipe is an extension of the function pipe.

It is a module that exports:

  • a deps/0 function which enumerates pipes required to run before or after the current pipe.
  • a prepare/1 function which takes a set of options and initializes it, or disables the pipe.
  • a call/2 function with the signature defined as in the function pipe.

The API expected by a module pipe is defined as a behaviour by this module.

Suspending Pipes

A pipe can suspend execution to perform blocking work asynchronously. This is useful for operations like external DNS resolution that would block the worker.

To suspend, return {suspend, Msg1, Opts1, AsyncFun} from your pipe:

-spec call(dns:message(), erldns_pipeline:opts()) -> erldns_pipeline:return().
call(Msg, Opts) ->
    case needs_external_resolution(Msg) of
        false ->
            Msg;
        true ->
            AsyncFun = fun(M, _O, _Ctx) ->
                %% This runs in the async pool, blocking is OK here
                resolve_external(M)
            end,
            {suspend, Msg, Opts, AsyncFun}
    end.

The asynchronous function behaves like a regular pipe function, and it can even return more asynchronous work. Once work is not asynchronous anymore, the remaining of the pipeline will be scheduled back in the regular worker.

Async pool

When a pipe suspends, the blocking work is run in a bounded worker pool so that listener workers are not blocked.

Pool behaviour:

  • Uses a worker pool (wpool) with CoDel for queue management.
  • Default parallelism: 4x. Configurable via pipeline options below.
  • Pool size and pending task counts are available for monitoring via the pool implementation.

Configuration

Packet pipeline (list of pipes)

{erldns, [
    {packet_pipeline, [
        erldns_questions,
        erldns_edns_max_payload_size,
        erldns_query_throttle,
        erldns_packet_cache,
        erldns_resolver_recursive,
        erldns_resolver,
        erldns_dnssec,
        erldns_sorter,
        erldns_section_counter,
        erldns_packet_cache,
        erldns_empty_verification
    ]},
]}

Pipeline async pool (size and CoDel)

{erldns, [
    {pipeline, #{
        async_pool => #{
          parallelism => 32,       % Worker count (default: 4 * schedulers)
          codel_interval => 500,   % CoDel interval in ms (default: 500)
          codel_target => 50       % CoDel target delay in ms (default: 50)
        }
      }
    }
]}

Telemetry events

[erldns, pipeline, error]

Emitted when pipeline execution hits an error: invalid pipe return, exception in a pipe, exception in async work, or suspend loop detection.

  • Measurements: #{count => 1}

  • Metadata (exception in sync or async pipe):

    kind => exit | error | throw
    reason => term()
    stacktrace => [term()]
  • Metadata (invalid return from pipe): #{reason => term()}

  • Metadata (too many nested suspends): #{reason => pipeline_suspend_loop}

  • Metadata (exception in async pool worker): #{what => async_work_failed, class => ..., reason => ..., stacktrace => ...}

Examples

Here's an example of a function pipe that arbitrarily sets the truncated bit on a message if the query is directed to the "example.com" domain:

-module(erldns_packet_pipe_example_set_truncated).
-behaviour(erldns_pipeline).

-export([prepare/1, call/2]).

-spec prepare(erldns_pipeline:opts()) -> disabled | erldns_pipeline:opts().
prepare(Opts) ->
    case enabled() of
        false -> disabled;
        true -> Opts
    end.

-spec call(dns:message(), erldns_pipeline:opts()) -> erldns_pipeline:return().
call(#dns_message{questions = [#dns_query{name = ~"example.com"} | _]} = Msg, _Opts) ->
    Msg#dns_message{tc = true}.
call(Msg, _Opts) ->
    Msg.

Summary

Types

Opaque continuation for suspended pipeline execution.

The dependencies of a pipe module.

The host that originated the request.

Options that can be passed and accumulated to the pipeline.

A pipe in the pipeline, either a module or a function.

A ready function in the pipeline.

A list of ready pipefun/0.

The result of a pipeline.

The return type of a pipe.

The underlying request transport protocol. All requests come either through UDP or TCP.

Callbacks

Trigger the pipeline at run-time. See return/0 for details.

Declare dependencies on other pipes.

Initialise the pipe handler, triggering side-effects and preparing any necessary metadata.

Functions

Call the main application packet pipeline with the pipes configured in the system configuration.

Call a custom pipeline by name.

Remove a custom pipeline from storage.

Get main pipeline

Get main pipeline

Check if a pipe is configured in the main pipeline.

Check if a pipe is configured in a specific pipeline. Returns false if the pipeline doesn't exist.

Verify and store a custom pipeline.

Types

continuation()

-opaque continuation()

Opaque continuation for suspended pipeline execution.

deps()

-type deps() :: #{prerequisites => [module()], dependents => [module()]}.

The dependencies of a pipe module.

Contains the following keys:

  • prerequisites is a list of module pipes that must appear earlier in the pipeline
  • dependents is a list of module pipes that must appear later in the pipeline

host()

-type host() :: inet:ip_address() | inet:hostname().

The host that originated the request.

opts()

-type opts() ::
          #{query_name := dns:dname(),
            query_labels := dns:labels(),
            query_type := dns:type(),
            monotonic_time := integer(),
            resolved := boolean(),
            transport := transport(),
            port := inet:port_number(),
            host := host(),
            socket := gen_tcp:socket() | {gen_udp:socket(), inet:port_number()},
            inet_socket := gen_tcp:socket() | gen_udp:socket(),
            atom() => dynamic()}.

Options that can be passed and accumulated to the pipeline.

Warning

socket is deprecated in favour of inet_socket.

pipe()

-type pipe() :: module() | fun((dns:message(), opts()) -> return()).

A pipe in the pipeline, either a module or a function.

See Module pipes and Function pipes for details.

pipefun()

-type pipefun() :: fun((dns:message(), opts()) -> return()).

A ready function in the pipeline.

It is either the function from pipe/0 or the function fun Module:call/2.

pipeline()

-type pipeline() :: [pipefun()].

A list of ready pipefun/0.

result()

-type result() :: halt | dns:message() | {suspend, continuation()}.

The result of a pipeline.

It can return halt, a new dns:message/0, or suspend for async operations. See return/0.

return()

-type return() ::
          halt |
          dns:message() |
          {dns:message(), opts()} |
          {stop, dns:message()} |
          {suspend, dns:message(), opts(), pipefun()}.

The return type of a pipe.

Valid returns are:

  • a possibly new dns:message/0;
  • a {Msg, Opts} tuple containing a new dns:message/0 and a new set of opts/0;
  • a {stop, Msg} tuple containing a possibly new dns:message/0 to stop the pipeline execution altogether and return the given message.
  • a halt atom, in which case the pipeline will be halted and no further pipes will be executed. The socket workers won't respond nor trigger any events, and it's fully the responsibility of a handler to deal with all the edge cases. This could be useful for either dropping the request entirely, or for stealing the request from a given worker to answer separately. Note that the pipe options will contain the UDP or TCP socket to answer to, so in the case of UDP the client can be answered using gen_udp:send/4 with the socket, host and port; and in the case of TCP it would be required to first steal the socket using gen_tcp:controlling_process/2 so that the connection is not closed.
  • a {suspend, Msg, Opts, AsyncFun} to allow a pipe to pause execution and perform blocking work asynchronously. This is used for operations like external DNS resolution that would otherwise block the worker pool.

transport()

-type transport() :: tcp | udp.

The underlying request transport protocol. All requests come either through UDP or TCP.

Callbacks

call/2

-callback call(dns:message(), opts()) -> return().

Trigger the pipeline at run-time. See return/0 for details.

deps()

(optional)
-callback deps() -> deps().

Declare dependencies on other pipes.

This pipe will only work correctly if the listed pipes appear earlier in the pipeline configuration. The pipeline will fail to start if dependencies are not satisfied.

Example:

-module(erldns_dnssec).
-behaviour(erldns_pipeline).
-export([deps/0, prepare/1, call/2]).

-spec deps() -> deps().
deps() ->
    #{prerequisites => [erldns_questions, erldns_resolver], dependents => []}.

prepare/1

(optional)
-callback prepare(opts()) -> disabled | opts().

Initialise the pipe handler, triggering side-effects and preparing any necessary metadata.

This will be called during the pipeline initialisation phase, which should happen at application startup provided you added the pipeline to your application's supervision tree. This will be called only once at boot and therefore it is an opportunity to do any necessary preparations that can reduce the amount of work at runtime and therefore improve performance.

This callback can return disabled, and then the call/2 callback won't be added to the pipeline.

Functions

call/2

-spec call(dns:message(), #{atom() => dynamic()}) -> result().

Call the main application packet pipeline with the pipes configured in the system configuration.

call_custom/3

-spec call_custom(dns:message(), #{atom() => dynamic()}, dynamic()) -> result().

Call a custom pipeline by name.

The pipeline should have been verified and stored previously with store_pipeline/2.

delete_pipeline(PipelineName)

-spec delete_pipeline(term()) -> boolean().

Remove a custom pipeline from storage.

Should be used to clean up a custom pipeline stored with store_pipeline/2.

get_pipeline()

-spec get_pipeline() -> {pipeline(), opts()}.

Get main pipeline

get_pipeline(PipelineName)

-spec get_pipeline(term()) -> {pipeline(), opts()}.

Get main pipeline

is_pipe_configured(Pipe)

-spec is_pipe_configured(pipe()) -> boolean().

Check if a pipe is configured in the main pipeline.

is_pipe_configured/2

-spec is_pipe_configured(pipe(), term()) -> boolean().

Check if a pipe is configured in a specific pipeline. Returns false if the pipeline doesn't exist.

store_pipeline(PipelineName, Pipes)

-spec store_pipeline(term(), [pipe()]) -> ok.

Verify and store a custom pipeline.

Can be used to prepare a custom pipeline that can be triggered using call_custom/3.

Validates that pipe dependencies (declared via deps/0) are satisfied by the given order. Note that custom pipelines are not garbage collected, that is, it is the responsibility of the registrant to ensure it is cleaned up when is not needed using delete_pipeline/1.

Note

The underlying storage is a persistent_term so all warnings apply.