Ferricstore.Flow.ClaimWaiters (ferricstore v0.4.3)

Copy Markdown View Source

Waiter registry for FLOW.CLAIM_DUE ... BLOCK.

This table is intentionally separate from list and stream waiters. Flow waiter notifications are only scheduling hints: a woken process must rerun the real claim_due command, which remains atomic through the replicated write path.

Summary

Types

waiter_key()

@type waiter_key() ::
  {binary(), binary() | atom(), non_neg_integer() | atom(), binary() | atom()}

Functions

any_waiters?()

@spec any_waiters?() :: boolean()

cleanup(pid)

@spec cleanup(pid()) :: :ok

count(key)

@spec count(waiter_key()) :: non_neg_integer()

has_live_waiter?(key)

@spec has_live_waiter?(waiter_key()) :: boolean()

init()

@spec init() :: :ok

message()

@spec message() :: atom()

notify(keys, count)

@spec notify([waiter_key()], pos_integer()) :: non_neg_integer()

notify_ready(type, state, priority, partition_key, count \\ 1)

@spec notify_ready(binary(), term(), term(), term(), pos_integer()) ::
  non_neg_integer()

notify_ready_many(hints, min_wake_budget_per_bucket \\ 8)

@spec notify_ready_many(
  [{binary(), term(), term(), term(), pos_integer()}],
  pos_integer()
) :: non_neg_integer()

notify_scheduled_ready(timer_key)

@spec notify_scheduled_ready(tuple()) :: non_neg_integer()

prune_stale_entries()

@spec prune_stale_entries() :: :ok

ready_keys(type, state, priority, partition_key)

@spec ready_keys(binary(), term(), term(), term()) :: [waiter_key()]

register(keys, pid, deadline_ms, opts \\ [])

@spec register([waiter_key()], pid(), integer(), keyword()) ::
  :ok | {:error, binary()}

schedule_ready(type, state, priority, partition_key, due_at_ms, count \\ 1)

@spec schedule_ready(binary(), term(), term(), term(), integer(), pos_integer()) ::
  :ok

scheduled_count()

@spec scheduled_count() :: non_neg_integer()

total_count()

@spec total_count() :: non_neg_integer()

unregister(keys, pid)

@spec unregister([waiter_key()], pid()) :: :ok

wait_keys(type, state, priority, partition_key)

@spec wait_keys(binary(), term(), term(), term()) :: [waiter_key()]