erldns_pipeline behaviour (erldns v10.5.3)
View SourceIt declares a pipeline of sequential transformations to apply to an incoming query until a response is constructed.
This module is responsible for handling the pipeline of pipes that will be executed when a DNS message is received. Handlers in this pipeline will be executed sequentially, accumulating the result of each handler and passing it to the next. This designs a pluggable framework where new behaviour can be injected as a new pipe handler in the right order.
Default pipes
The following are enabled by default, see their documentation for details:
erldns_questionserldns_edns_max_payload_sizeerldns_query_throttleerldns_packet_cacheerldns_resolver_recursiveerldns_resolvererldns_dnssecerldns_sortererldns_section_countererldns_empty_verification
Types of pipelines
There are two kind of pipes: function pipes and module pipes.
Function pipes
A function pipe is by definition any function that receives a dns:message/0
and a set of opts/0 options and returns a dns:message/0. Function pipes
must have the following type signature:
-type pipefun() :: fun((dns:message(), erldns_pipeline:opts()) -> erldns_pipeline:return())Module pipes
The preferred mechanism, a module pipe is an extension of the function pipe.
It is a module that exports:
- a
deps/0function which enumerates pipes required to run before or after the current pipe. - a
prepare/1function which takes a set of options and initializes it, or disables the pipe. - a
call/2function with the signature defined as in the function pipe.
The API expected by a module pipe is defined as a behaviour by this module.
Suspending Pipes
A pipe can suspend execution to perform blocking work asynchronously. This is useful for operations like external DNS resolution that would block the worker.
To suspend, return {suspend, Msg1, Opts1, AsyncFun} from your pipe:
-spec call(dns:message(), erldns_pipeline:opts()) -> erldns_pipeline:return().
call(Msg, Opts) ->
case needs_external_resolution(Msg) of
false ->
Msg;
true ->
AsyncFun = fun(M, _O, _Ctx) ->
%% This runs in the async pool, blocking is OK here
resolve_external(M)
end,
{suspend, Msg, Opts, AsyncFun}
end.The asynchronous function behaves like a regular pipe function, and it can even return more asynchronous work. Once work is not asynchronous anymore, the remaining of the pipeline will be scheduled back in the regular worker.
Async pool
When a pipe suspends, the blocking work is run in a bounded worker pool so that listener workers are not blocked.
Pool behaviour:
- Uses a worker pool (wpool) with CoDel for queue management.
- Default parallelism: 4x. Configurable via
pipelineoptions below. - Pool size and pending task counts are available for monitoring via the pool implementation.
Configuration
Packet pipeline (list of pipes)
{erldns, [
{packet_pipeline, [
erldns_questions,
erldns_edns_max_payload_size,
erldns_query_throttle,
erldns_packet_cache,
erldns_resolver_recursive,
erldns_resolver,
erldns_dnssec,
erldns_sorter,
erldns_section_counter,
erldns_packet_cache,
erldns_empty_verification
]},
]}Pipeline async pool (size and CoDel)
{erldns, [
{pipeline, #{
async_pool => #{
parallelism => 32, % Worker count (default: 4 * schedulers)
codel_interval => 500, % CoDel interval in ms (default: 500)
codel_target => 50 % CoDel target delay in ms (default: 50)
}
}
}
]}Telemetry events
[erldns, pipeline, error]
Emitted when pipeline execution hits an error: invalid pipe return, exception in a pipe, exception in async work, or suspend loop detection.
Measurements:
#{count => 1}Metadata (exception in sync or async pipe):
kind => exit | error | throw reason => term() stacktrace => [term()]Metadata (invalid return from pipe):
#{reason => term()}Metadata (too many nested suspends):
#{reason => pipeline_suspend_loop}Metadata (exception in async pool worker):
#{what => async_work_failed, class => ..., reason => ..., stacktrace => ...}
Examples
Here's an example of a function pipe that arbitrarily sets the truncated bit on a message if the query is directed to the "example.com" domain:
-module(erldns_packet_pipe_example_set_truncated).
-behaviour(erldns_pipeline).
-export([prepare/1, call/2]).
-spec prepare(erldns_pipeline:opts()) -> disabled | erldns_pipeline:opts().
prepare(Opts) ->
case enabled() of
false -> disabled;
true -> Opts
end.
-spec call(dns:message(), erldns_pipeline:opts()) -> erldns_pipeline:return().
call(#dns_message{questions = [#dns_query{name = ~"example.com"} | _]} = Msg, _Opts) ->
Msg#dns_message{tc = true}.
call(Msg, _Opts) ->
Msg.
Summary
Types
Opaque continuation for suspended pipeline execution.
The dependencies of a pipe module.
The host that originated the request.
Options that can be passed and accumulated to the pipeline.
A pipe in the pipeline, either a module or a function.
A ready function in the pipeline.
A list of ready pipefun/0.
The result of a pipeline.
The return type of a pipe.
The underlying request transport protocol. All requests come either through UDP or TCP.
Callbacks
Declare dependencies on other pipes.
Initialise the pipe handler, triggering side-effects and preparing any necessary metadata.
Functions
Call the main application packet pipeline with the pipes configured in the system configuration.
Call a custom pipeline by name.
Remove a custom pipeline from storage.
Get main pipeline
Get main pipeline
Check if a pipe is configured in the main pipeline.
Check if a pipe is configured in a specific pipeline. Returns false if the pipeline doesn't exist.
Verify and store a custom pipeline.
Types
-opaque continuation()
Opaque continuation for suspended pipeline execution.
The dependencies of a pipe module.
Contains the following keys:
prerequisitesis a list of module pipes that must appear earlier in the pipelinedependentsis a list of module pipes that must appear later in the pipeline
-type host() :: inet:ip_address() | inet:hostname().
The host that originated the request.
-type opts() :: #{query_name := dns:dname(), query_labels := dns:labels(), query_type := dns:type(), monotonic_time := integer(), resolved := boolean(), transport := transport(), port := inet:port_number(), host := host(), socket := gen_tcp:socket() | {gen_udp:socket(), inet:port_number()}, inet_socket := gen_tcp:socket() | gen_udp:socket(), atom() => dynamic()}.
Options that can be passed and accumulated to the pipeline.
Warning
socket is deprecated in favour of inet_socket.
-type pipe() :: module() | fun((dns:message(), opts()) -> return()).
A pipe in the pipeline, either a module or a function.
See Module pipes and Function pipes for details.
-type pipefun() :: fun((dns:message(), opts()) -> return()).
A ready function in the pipeline.
It is either the function from pipe/0 or the function fun Module:call/2.
-type pipeline() :: [pipefun()].
A list of ready pipefun/0.
-type result() :: halt | dns:message() | {suspend, continuation()}.
The result of a pipeline.
It can return halt, a new dns:message/0, or suspend for async operations. See return/0.
-type return() :: halt | dns:message() | {dns:message(), opts()} | {stop, dns:message()} | {suspend, dns:message(), opts(), pipefun()}.
The return type of a pipe.
Valid returns are:
- a possibly new
dns:message/0; - a
{Msg, Opts}tuple containing a newdns:message/0and a new set ofopts/0; - a
{stop, Msg}tuple containing a possibly newdns:message/0to stop the pipeline execution altogether and return the given message. - a
haltatom, in which case the pipeline will be halted and no further pipes will be executed. The socket workers won't respond nor trigger any events, and it's fully the responsibility of a handler to deal with all the edge cases. This could be useful for either dropping the request entirely, or for stealing the request from a given worker to answer separately. Note that the pipe options will contain the UDP or TCP socket to answer to, so in the case of UDP the client can be answered usinggen_udp:send/4with the socket, host and port; and in the case of TCP it would be required to first steal the socket usinggen_tcp:controlling_process/2so that the connection is not closed. - a
{suspend, Msg, Opts, AsyncFun}to allow a pipe to pause execution and perform blocking work asynchronously. This is used for operations like external DNS resolution that would otherwise block the worker pool.
-type transport() :: tcp | udp.
The underlying request transport protocol. All requests come either through UDP or TCP.
Callbacks
-callback call(dns:message(), opts()) -> return().
Trigger the pipeline at run-time. See return/0 for details.
-callback deps() -> deps().
Declare dependencies on other pipes.
This pipe will only work correctly if the listed pipes appear earlier in the pipeline configuration. The pipeline will fail to start if dependencies are not satisfied.
Example:
-module(erldns_dnssec).
-behaviour(erldns_pipeline).
-export([deps/0, prepare/1, call/2]).
-spec deps() -> deps().
deps() ->
#{prerequisites => [erldns_questions, erldns_resolver], dependents => []}.
Initialise the pipe handler, triggering side-effects and preparing any necessary metadata.
This will be called during the pipeline initialisation phase, which should happen at application startup provided you added the pipeline to your application's supervision tree. This will be called only once at boot and therefore it is an opportunity to do any necessary preparations that can reduce the amount of work at runtime and therefore improve performance.
This callback can return disabled, and then the call/2 callback won't be added to the
pipeline.
Functions
-spec call(dns:message(), #{atom() => dynamic()}) -> result().
Call the main application packet pipeline with the pipes configured in the system configuration.
-spec call_custom(dns:message(), #{atom() => dynamic()}, dynamic()) -> result().
Call a custom pipeline by name.
The pipeline should have been verified and stored previously with store_pipeline/2.
Remove a custom pipeline from storage.
Should be used to clean up a custom pipeline stored with store_pipeline/2.
Get main pipeline
Get main pipeline
Check if a pipe is configured in the main pipeline.
Check if a pipe is configured in a specific pipeline. Returns false if the pipeline doesn't exist.
Verify and store a custom pipeline.
Can be used to prepare a custom pipeline that can be triggered using call_custom/3.
Validates that pipe dependencies (declared via deps/0) are satisfied by the given order.
Note that custom pipelines are not garbage collected, that is, it is the responsibility of
the registrant to ensure it is cleaned up when is not needed using delete_pipeline/1.
Note
The underlying storage is a persistent_term so all warnings apply.