ETS-based queue buffer for high-throughput data processing.
Tidefall.Queue buffers arbitrary data in insertion order and
periodically processes it using a configurable processor function.
The queue is drain-only: there is no pop/dequeue. Producers
push/3 items, and the engine drains the buffer to the processor on
each processing tick.
It implements partitioning to reduce lock contention during high-throughput
writes, and uses double-buffering to ensure zero-downtime processing.
Data Flow
push(buffer, items)
|
v
+-------------------+
| Partition Routing |
| phash2(item, N) |
+-------------------+
| | |
v v v
+-------+ +-------+ +-------+ ETS :ordered_set
| P 0 | | P 1 | | P N-1 | Key: {sort_key, ref}
+-------+ +-------+ +-------+ Val: item
| | |
v v v
+--------------------------------------+
| processor(batch) |
| batch = [val1, val2, ...] |
+--------------------------------------+Items are routed to partitions via phash2, stored in
:ordered_set ETS tables keyed by {sort_key, ref}, and
periodically flushed to the processor in batches. sort_key
is System.monotonic_time() by default — giving insertion-time
order — or the term produced by the :sort_key runtime option;
ref keeps every key unique so no item is ever overwritten.
Ordering is per partition (see :sort_key under runtime options).
Start options
:name(atom/0) - Required. The buffer name (used to identify the buffer).:processor- Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).Can be either:
- A function of arity 1:
fn batch -> ... endor&MyModule.process/1. - An MFA tuple
{Module, Function, Args}: The batch is prepended to the arguments, e.g.,{MyModule, :process, [extra_arg]}will callMyModule.process(batch, extra_arg).
- A function of arity 1:
:partitions(non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults toSystem.schedulers_online()to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.:processing_interval(pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is5000.:processing_timeout(timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. SeeTask.Supervisor.async_nolink/3for more information. The default value is60000.:processing_batch_size- Controls how buffered data is passed to the processor callback.Can be either:
A positive integer (default
10): Messages are read from the ETS table in batches of up to this size usingets:selectwith continuations. The processor is called once per batch. This optimizes memory usage for large tables.:table: The ETS table identifier (a tid) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.In this mode the processor takes ownership of the table. The buffer creates a fresh table for incoming writes on the next swap and does NOT delete the table you received. When the processing task exits (after the processor returns), ETS will auto-delete the table unless you have already transferred it elsewhere via
:ets.give_away/3. If you want to keep the table beyond the task's lifetime, call:ets.give_away/3to hand it off to another process before returning.
The default value is
10.:drain_threshold(pos_integer/0) - Optional per-partition item count that triggers an early drain. When a partition's current table reaches this many items, it drains immediately instead of waiting for the next:processing_intervaltick — a partition drains on whichever fires first, size or timer.Unset (the default) means disabled: partitions drain on the interval only, exactly as before. This is a lossless early-flush trigger, not a cap — nothing is dropped or evicted, and a partition's table may briefly exceed the threshold between size checks or under concurrent writes.
Size is checked on the
:drain_check_intervalpoll inside the partition, never on the write path.:drain_check_interval(pos_integer/0) - How often (in milliseconds) each partition polls its current table size to decide whether to drain early. Only has an effect when:drain_thresholdis set. Defaults to1000.Keep this below
:processing_interval— otherwise the interval timer usually drains first and the size trigger rarely fires. The check is one:ets.info(table, :size)per partition per interval, run inside the partition process (off the write path), so its cost is negligible at this frequency.The default value is
1000.
Runtime options
:partition_key(term/0) - Determines what value is used as the routing key for partitioning messages.Can be one of four values:
nil(default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.
An MFA tuple
{Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g.,
:logs,:events, or an identifier).
Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).
The default value is
nil.:sort_key- Controls the term used to order buffered items within a partition. The ETS key is{sort_key_term, ref};refis always retained for uniqueness, so distinct items are never overwritten regardless of the:sort_keyvalue.Can be one of:
- Omitted (default) — items order by insertion time
(
System.monotonic_time/0), i.e. the order they were pushed. - A function of arity 1 — applied to each item to derive its sort
term (e.g.
& &1.priority, or an event timestamp carried in the payload). - A function of arity 0 — evaluated per item to generate the sort term at push time (e.g. a custom clock or sequence).
Ordering scope and ties
Ordering is per partition — items are routed across partitions first, and each partition's batch is ordered independently. For a single global order use
partitions: 1or a:partition_keythat co-locates the items you need ordered together. Order among items with the same sort term is unspecified (broken byref), consistent with how same-timestamp items already drain today. Sort terms are compared with Erlang's total term order, so prefer integers, atoms, or binaries; complex terms (maps, tuples) sort in non-obvious ways.- Omitted (default) — items order by insertion time
(
Examples
Standalone Usage
# Start a queue buffer with a custom processor
iex> {:ok, _sup_pid} =
...> Tidefall.Queue.start_link(
...> name: :my_buffer,
...> processor: fn batch -> IO.inspect(batch) end
...> )
# Push a single item into the buffer
iex> Tidefall.Queue.push(:my_buffer, "item1")
:ok
# Push a batch of items
iex> Tidefall.Queue.push(:my_buffer, ["item2", "item3"])
:ok
# Check buffer size
iex> Tidefall.Queue.size(:my_buffer)
3
# Stop the buffer gracefully (processes remaining items)
iex> Tidefall.Queue.stop(:my_buffer)
:okAdding to a Supervision Tree
children = [
{Tidefall.Queue,
name: :my_buffer,
processor: &MyApp.EventProcessor.process_batch/1}
]
Supervisor.start_link(children, strategy: :one_for_one)Defining a buffer module
For the recommended module-based pattern — use Tidefall.Queue,
where the module name becomes the default instance and start options
layer across compile-time use opts, the application environment, and
explicit opts — see the
Module-based buffers
section of Tidefall.
Processor
The processor function receives a list of values (the items pushed to the buffer):
fn batch ->
# batch is [value1, value2, ...]
Enum.each(batch, fn value -> process(value) end)
endSee The processor for when it runs, batching, failure isolation, and shutdown-drain behavior.
Summary
Functions
Returns the queue buffer child spec.
Pushes an item or a batch of items into the buffer.
Returns the queue size (total number of items across all partitions).
Starts a new queue buffer.
Stops a queue buffer gracefully.
Updates the options for the queue buffer.
Types
@type buffer() :: Tidefall.Buffer.buffer()
Proxy type for a buffer
@type item() :: any()
Any term that will be buffered and processed
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns the queue buffer child spec.
Pushes an item or a batch of items into the buffer.
Parameters
buffer- The buffer name (atom).item_or_batch- A single item or a list of items to push.opts- Optional runtime options.
Options
See runtime options.
Examples
# Simple push with default routing
push(:my_buffer, "item1")
push(:my_buffer, ["item2", "item3"])
# Custom partition routing using function
push(:my_buffer, user_event, partition_key: &(&1.user_id))
# Custom partition routing using MFA tuple (item prepended to args)
push(:my_buffer, event, partition_key: {MyApp.Router, :get_partition, []})
# Custom partition routing with fixed key (all items to same partition)
push(:my_buffer, log_entry, partition_key: :logs)
# Custom ordering: drain by a value-derived sort key (per partition)
push(:my_buffer, event, sort_key: & &1.priority)
@spec size(buffer()) :: non_neg_integer()
Returns the queue size (total number of items across all partitions).
Examples
size(:my_buffer)
@spec start_link(keyword()) :: Supervisor.on_start()
Starts a new queue buffer.
Options
See start options.
Examples
Tidefall.Queue.start_link(
name: :my_queue_buffer,
processor: &MyApp.Sink.process/1
)
Stops a queue buffer gracefully.
Examples
Tidefall.Queue.stop(:my_queue_buffer)
Updates the options for the queue buffer.
Options
Updatable options: :processing_interval, :processing_timeout,
:processing_batch_size, :drain_threshold, :drain_check_interval. See
start options for each option's
semantics.
Examples
# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval: 100)