Tidefall.Buffer behaviour (Tidefall v1.0.0-rc.0)

Copy Markdown View Source

Buffer operations and behaviour.

This module is the public interface for buffer-level concerns:

  • Starting and stopping buffers
  • Total size across partitions
  • Mutating runtime options
  • Locating the partition for a routing key
  • The behaviour contract every buffer implementation must satisfy

End-user code typically goes through the buffer-type module (Tidefall.Queue, Tidefall.HashMap), which delegates the shared operations here.

Start options

  • :name (atom/0) - Required. The buffer name (used to identify the buffer).

  • :processor - Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).

    Can be either:

    • A function of arity 1: fn batch -> ... end or &MyModule.process/1.
    • An MFA tuple {Module, Function, Args}: The batch is prepended to the arguments, e.g., {MyModule, :process, [extra_arg]} will call MyModule.process(batch, extra_arg).
  • :partitions (non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults to System.schedulers_online() to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.

  • :processing_interval (pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is 5000.

  • :processing_timeout (timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. See Task.Supervisor.async_nolink/3 for more information. The default value is 60000.

  • :processing_batch_size - Controls how buffered data is passed to the processor callback.

    Can be either:

    • A positive integer (default 10): Messages are read from the ETS table in batches of up to this size using ets:select with continuations. The processor is called once per batch. This optimizes memory usage for large tables.

    • :table: The ETS table identifier (a tid) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.

      In this mode the processor takes ownership of the table. The buffer creates a fresh table for incoming writes on the next swap and does NOT delete the table you received. When the processing task exits (after the processor returns), ETS will auto-delete the table unless you have already transferred it elsewhere via :ets.give_away/3. If you want to keep the table beyond the task's lifetime, call :ets.give_away/3 to hand it off to another process before returning.

    The default value is 10.

Runtime options

The following runtime options are shared by Tidefall.Queue and Tidefall.HashMap:

  • :partition_key (term/0) - Determines what value is used as the routing key for partitioning messages.

    Can be one of four values:

    • nil (default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.

    • A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.

    • An MFA tuple {Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.

    • Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g., :logs, :events, or an identifier).

    Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).

    The default value is nil.

Summary

Types

Buffer name

Callbacks

Returns the match spec used by the processing task when it drains the swapped table via :ets.select/3. The spec determines the shape of each element handed to the processor.

Returns the list of options passed verbatim to :ets.new/2 when the partition creates one of its two backing tables.

Functions

Returns the buffer size (total number of messages across all partitions).

Returns the partition based on the given arguments.

Starts a new buffer.

Stops a buffer gracefully.

Updates the options for the buffer.

Types

buffer()

@type buffer() :: atom()

Buffer name

Callbacks

ets_match_spec()

@callback ets_match_spec() :: :ets.match_spec()

Returns the match spec used by the processing task when it drains the swapped table via :ets.select/3. The spec determines the shape of each element handed to the processor.

ets_table_opts()

@callback ets_table_opts() :: [atom() | {atom(), any()}]

Returns the list of options passed verbatim to :ets.new/2 when the partition creates one of its two backing tables.

The list must include the ETS table type (:set / :ordered_set / :bag / :duplicate_bag), the :keypos, and any concurrency / access knobs the impl wants.

The partition does not augment or rewrite this list; what the impl returns is exactly what :ets.new/2 gets.

Functions

buffer_size(buffer)

@spec buffer_size(buffer()) :: non_neg_integer()

Returns the buffer size (total number of messages across all partitions).

Exposed as size/1 on Tidefall.Queue and Tidefall.HashMap — most code calls those rather than buffer_size/1 directly.

Examples

iex> Tidefall.Buffer.buffer_size(:my_buffer)
10

get_partition(buffer, partition_key, object)

@spec get_partition(buffer(), any(), any()) :: atom()

Returns the partition based on the given arguments.

start_link(opts)

@spec start_link(keyword()) :: Supervisor.on_start()

Starts a new buffer.

Prefer implementation-specific functions

It is recommended to use Tidefall.Queue.start_link/1 or Tidefall.HashMap.start_link/1 instead, as they automatically set the :module option for you.

Examples

iex> Tidefall.Buffer.start_link(
...>   module: Tidefall.Queue,
...>   name: :my_buffer
...> )
{:ok, #PID<0.123.0>}

Notice that the :module option must be set to Tidefall.Queue or Tidefall.HashMap.

stop(buffer, reason, timeout)

@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok

Stops a buffer gracefully.

Examples

iex> Tidefall.Buffer.stop(:my_buffer)
:ok

update_options(buffer, opts)

@spec update_options(
  buffer(),
  keyword()
) :: :ok

Updates the options for the buffer.

Examples

iex> Tidefall.Buffer.update_options(:my_buffer, processing_interval: 1000)
:ok

Notice that the options are updated for all partitions of the buffer.