ETS-based queue buffer for high-throughput data processing.
Tidefall.Queue buffers arbitrary data in insertion order and
periodically processes it using a configurable processor function.
The queue is drain-only: there is no pop/dequeue. Producers
push/3 items, and the engine drains the buffer to the processor on
each processing tick.
It implements partitioning to reduce lock contention during high-throughput
writes, and uses double-buffering to ensure zero-downtime processing.
Data Flow
push(buffer, items)
|
v
+-------------------+
| Partition Routing |
| phash2(item, N) |
+-------------------+
| | |
v v v
+-------+ +-------+ +-------+ ETS :ordered_set
| P 0 | | P 1 | | P N-1 | Key: {monotonic_time, ref}
+-------+ +-------+ +-------+ Val: item
| | |
v v v
+--------------------------------------+
| processor(batch) |
| batch = [val1, val2, ...] |
+--------------------------------------+Items are routed to partitions via phash2, stored in
:ordered_set ETS tables keyed by {monotonic_time, ref}
(ensuring insertion-time ordering with uniqueness), and
periodically flushed to the processor in batches.
Start options
:name(atom/0) - Required. The buffer name (used to identify the buffer).:processor- Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).Can be either:
- A function of arity 1:
fn batch -> ... endor&MyModule.process/1. - An MFA tuple
{Module, Function, Args}: The batch is prepended to the arguments, e.g.,{MyModule, :process, [extra_arg]}will callMyModule.process(batch, extra_arg).
- A function of arity 1:
:partitions(non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults toSystem.schedulers_online()to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.:processing_interval(pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is5000.:processing_timeout(timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. SeeTask.Supervisor.async_nolink/3for more information. The default value is60000.:processing_batch_size- Controls how buffered data is passed to the processor callback.Can be either:
A positive integer (default
10): Messages are read from the ETS table in batches of up to this size usingets:selectwith continuations. The processor is called once per batch. This optimizes memory usage for large tables.:table: The ETS table identifier (a tid) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.In this mode the processor takes ownership of the table. The buffer creates a fresh table for incoming writes on the next swap and does NOT delete the table you received. When the processing task exits (after the processor returns), ETS will auto-delete the table unless you have already transferred it elsewhere via
:ets.give_away/3. If you want to keep the table beyond the task's lifetime, call:ets.give_away/3to hand it off to another process before returning.
The default value is
10.
Runtime options
:partition_key(term/0) - Determines what value is used as the routing key for partitioning messages.Can be one of four values:
nil(default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.
An MFA tuple
{Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g.,
:logs,:events, or an identifier).
Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).
The default value is
nil.
Examples
Standalone Usage
# Start a queue buffer with a custom processor
iex> {:ok, _sup_pid} =
...> Tidefall.Queue.start_link(
...> name: :my_buffer,
...> processor: fn batch -> IO.inspect(batch) end
...> )
# Push a single item into the buffer
iex> Tidefall.Queue.push(:my_buffer, "item1")
:ok
# Push a batch of items
iex> Tidefall.Queue.push(:my_buffer, ["item2", "item3"])
:ok
# Check buffer size
iex> Tidefall.Queue.size(:my_buffer)
3
# Stop the buffer gracefully (processes remaining items)
iex> Tidefall.Queue.stop(:my_buffer)
:okAdding to a Supervision Tree
children = [
{Tidefall.Queue,
name: :my_buffer,
processor: &MyApp.EventProcessor.process_batch/1}
]
Supervisor.start_link(children, strategy: :one_for_one)Defining a buffer module
For the recommended module-based pattern — use Tidefall.Queue,
where the module name becomes the default instance and start options
layer across compile-time use opts, the application environment, and
explicit opts — see the
Module-based buffers
section of Tidefall.
Processor
The processor function receives a list of values (the items pushed to the buffer):
fn batch ->
# batch is [value1, value2, ...]
Enum.each(batch, fn value -> process(value) end)
endSee The processor for when it runs, batching, failure isolation, and shutdown-drain behavior.
Summary
Functions
Returns the queue buffer child spec.
Pushes an item or a batch of items into the buffer.
Returns the queue size (total number of items across all partitions).
Starts a new queue buffer.
Stops a queue buffer gracefully.
Updates the options for the queue buffer.
Types
@type buffer() :: Tidefall.Buffer.buffer()
Proxy type for a buffer
@type item() :: any()
Any term that will be buffered and processed
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns the queue buffer child spec.
Pushes an item or a batch of items into the buffer.
Parameters
buffer- The buffer name (atom).item_or_batch- A single item or a list of items to push.opts- Optional runtime options.
Options
See runtime options.
Examples
# Simple push with default routing
push(:my_buffer, "item1")
push(:my_buffer, ["item2", "item3"])
# Custom partition routing using function
push(:my_buffer, user_event, partition_key: &(&1.user_id))
# Custom partition routing using MFA tuple (item prepended to args)
push(:my_buffer, event, partition_key: {MyApp.Router, :get_partition, []})
# Custom partition routing with fixed key (all items to same partition)
push(:my_buffer, log_entry, partition_key: :logs)
@spec size(buffer()) :: non_neg_integer()
Returns the queue size (total number of items across all partitions).
Examples
size(:my_buffer)
@spec start_link(keyword()) :: Supervisor.on_start()
Starts a new queue buffer.
Options
See start options.
Examples
Tidefall.Queue.start_link(
name: :my_queue_buffer,
processor: &MyApp.Sink.process/1
)
Stops a queue buffer gracefully.
Examples
Tidefall.Queue.stop(:my_queue_buffer)
Updates the options for the queue buffer.
Options
Updatable options: :processing_interval, :processing_timeout,
:processing_batch_size. See start options
for each option's semantics.
Examples
# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval: 100)