Tidefall.Queue (Tidefall v1.0.0-rc.0)

Copy Markdown View Source

ETS-based queue buffer for high-throughput data processing.

Tidefall.Queue buffers arbitrary data in insertion order and periodically processes it using a configurable processor function. The queue is drain-only: there is no pop/dequeue. Producers push/3 items, and the engine drains the buffer to the processor on each processing tick. It implements partitioning to reduce lock contention during high-throughput writes, and uses double-buffering to ensure zero-downtime processing.

Data Flow

push(buffer, items)
       |
       v
+-------------------+
| Partition Routing |
| phash2(item, N)   |
+-------------------+
   |         |         |
   v         v         v
+-------+ +-------+ +-------+     ETS :ordered_set
| P 0   | | P 1   | | P N-1 |     Key: {monotonic_time, ref}
+-------+ +-------+ +-------+     Val: item
   |         |         |
   v         v         v
+--------------------------------------+
| processor(batch)                     |
| batch = [val1, val2, ...]            |
+--------------------------------------+

Items are routed to partitions via phash2, stored in :ordered_set ETS tables keyed by {monotonic_time, ref} (ensuring insertion-time ordering with uniqueness), and periodically flushed to the processor in batches.

Start options

  • :name (atom/0) - Required. The buffer name (used to identify the buffer).

  • :processor - Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).

    Can be either:

    • A function of arity 1: fn batch -> ... end or &MyModule.process/1.
    • An MFA tuple {Module, Function, Args}: The batch is prepended to the arguments, e.g., {MyModule, :process, [extra_arg]} will call MyModule.process(batch, extra_arg).
  • :partitions (non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults to System.schedulers_online() to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.

  • :processing_interval (pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is 5000.

  • :processing_timeout (timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. See Task.Supervisor.async_nolink/3 for more information. The default value is 60000.

  • :processing_batch_size - Controls how buffered data is passed to the processor callback.

    Can be either:

    • A positive integer (default 10): Messages are read from the ETS table in batches of up to this size using ets:select with continuations. The processor is called once per batch. This optimizes memory usage for large tables.

    • :table: The ETS table identifier (a tid) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.

      In this mode the processor takes ownership of the table. The buffer creates a fresh table for incoming writes on the next swap and does NOT delete the table you received. When the processing task exits (after the processor returns), ETS will auto-delete the table unless you have already transferred it elsewhere via :ets.give_away/3. If you want to keep the table beyond the task's lifetime, call :ets.give_away/3 to hand it off to another process before returning.

    The default value is 10.

Runtime options

  • :partition_key (term/0) - Determines what value is used as the routing key for partitioning messages.

    Can be one of four values:

    • nil (default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.

    • A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.

    • An MFA tuple {Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.

    • Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g., :logs, :events, or an identifier).

    Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).

    The default value is nil.

Examples

Standalone Usage

# Start a queue buffer with a custom processor
iex> {:ok, _sup_pid} =
...>   Tidefall.Queue.start_link(
...>     name: :my_buffer,
...>     processor: fn batch -> IO.inspect(batch) end
...>   )

# Push a single item into the buffer
iex> Tidefall.Queue.push(:my_buffer, "item1")
:ok

# Push a batch of items
iex> Tidefall.Queue.push(:my_buffer, ["item2", "item3"])
:ok

# Check buffer size
iex> Tidefall.Queue.size(:my_buffer)
3

# Stop the buffer gracefully (processes remaining items)
iex> Tidefall.Queue.stop(:my_buffer)
:ok

Adding to a Supervision Tree

children = [
  {Tidefall.Queue,
   name: :my_buffer,
   processor: &MyApp.EventProcessor.process_batch/1}
]

Supervisor.start_link(children, strategy: :one_for_one)

Defining a buffer module

For the recommended module-based pattern — use Tidefall.Queue, where the module name becomes the default instance and start options layer across compile-time use opts, the application environment, and explicit opts — see the Module-based buffers section of Tidefall.

Processor

The processor function receives a list of values (the items pushed to the buffer):

fn batch ->
  # batch is [value1, value2, ...]
  Enum.each(batch, fn value -> process(value) end)
end

See The processor for when it runs, batching, failure isolation, and shutdown-drain behavior.

Summary

Types

Proxy type for a buffer

Any term that will be buffered and processed

Functions

Returns the queue buffer child spec.

Pushes an item or a batch of items into the buffer.

Returns the queue size (total number of items across all partitions).

Starts a new queue buffer.

Updates the options for the queue buffer.

Types

buffer()

@type buffer() :: Tidefall.Buffer.buffer()

Proxy type for a buffer

item()

@type item() :: any()

Any term that will be buffered and processed

Functions

child_spec(opts)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns the queue buffer child spec.

push(buffer, item_or_batch, opts \\ [])

@spec push(buffer(), item() | [item()], keyword()) :: :ok

Pushes an item or a batch of items into the buffer.

Parameters

  • buffer - The buffer name (atom).
  • item_or_batch - A single item or a list of items to push.
  • opts - Optional runtime options.

Options

See runtime options.

Examples

# Simple push with default routing
push(:my_buffer, "item1")
push(:my_buffer, ["item2", "item3"])

# Custom partition routing using function
push(:my_buffer, user_event, partition_key: &(&1.user_id))

# Custom partition routing using MFA tuple (item prepended to args)
push(:my_buffer, event, partition_key: {MyApp.Router, :get_partition, []})

# Custom partition routing with fixed key (all items to same partition)
push(:my_buffer, log_entry, partition_key: :logs)

size(buffer)

@spec size(buffer()) :: non_neg_integer()

Returns the queue size (total number of items across all partitions).

Examples

size(:my_buffer)

start_link(opts \\ [])

@spec start_link(keyword()) :: Supervisor.on_start()

Starts a new queue buffer.

Options

See start options.

Examples

Tidefall.Queue.start_link(
  name: :my_queue_buffer,
  processor: &MyApp.Sink.process/1
)

stop(buffer, reason \\ :normal, timeout \\ :infinity)

@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok

Stops a queue buffer gracefully.

Examples

Tidefall.Queue.stop(:my_queue_buffer)

update_options(buffer, opts)

@spec update_options(
  buffer(),
  keyword()
) :: :ok

Updates the options for the queue buffer.

Options

Updatable options: :processing_interval, :processing_timeout, :processing_batch_size. See start options for each option's semantics.

Examples

# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval: 100)