Ferricstore.Merge.Scheduler (ferricstore v0.3.2)

Copy Markdown View Source

Per-shard merge scheduler that triggers compaction when file rotation occurs.

Each shard has its own Scheduler GenServer. Instead of polling every 30s with expensive File.ls calls that block the Shard GenServer, the scheduler is event-driven: the Shard notifies it on file rotation via notify_rotation/2.

Merge modes

The scheduler supports three merge modes per the spec (section 2E):

  • Hot mode -- Event-driven, can trigger anytime. When the file count reaches min_files_for_merge after a rotation, the scheduler attempts a merge (subject to the node-level semaphore). This is the default mode.

  • Bulk mode -- Only merges during a configurable time window (e.g. 02:00-04:00). Outside the window, rotations are noted but no merge is triggered. Inside the window, file count is checked.

  • Age mode -- Like bulk mode but only merges files older than a configurable age threshold, within a time window.

Merge lifecycle

  1. Shard rotates its active file and casts {:file_rotated, file_count}.
  2. Scheduler checks file count against min_files_for_merge and mode.
  3. If merge is needed, scheduler requests the node-level semaphore.
  4. If semaphore is acquired, scheduler writes a merge manifest.
  5. Scheduler selects non-active files for incremental merge.
  6. Scheduler calls the shard's run_compaction via GenServer.call.
  7. On completion, scheduler deletes the manifest and releases the semaphore.
  8. On failure, scheduler logs the error, deletes the manifest, releases semaphore.

Configuration

Configuration is passed via the :merge key in the application env:

config :ferricstore, :merge,
  mode: :hot,
  min_files_for_merge: 2,
  max_files_per_merge: 10,
  merge_window: {2, 4},
  min_free_space_ratio: 0.1

Summary

Functions

Returns a specification to start this module under a supervisor.

Called by the Shard when per-file fragmentation exceeds thresholds.

Called by the Shard when it rotates to a new active file.

Returns the registered process name for the scheduler at index.

Starts a merge scheduler for the given shard.

Returns the current status of the merge scheduler for observability.

Forces an immediate merge check, bypassing the event-driven trigger. Used in tests and for manual compaction via INFO/DEBUG commands.

Types

config()

@type config() :: %{
  mode: merge_mode(),
  min_files_for_merge: pos_integer(),
  max_files_per_merge: pos_integer(),
  merge_window: {non_neg_integer(), non_neg_integer()},
  min_free_space_ratio: float(),
  fragmentation_threshold: float(),
  dead_bytes_threshold: non_neg_integer(),
  merge_cooldown_ms: non_neg_integer(),
  small_file_threshold: non_neg_integer(),
  merge_retry_interval_ms: non_neg_integer()
}

merge_mode()

@type merge_mode() :: :hot | :bulk | :age

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

notify_fragmentation(shard_index, candidate_file_ids, file_count)

@spec notify_fragmentation(non_neg_integer(), [non_neg_integer()], non_neg_integer()) ::
  :ok

Called by the Shard when per-file fragmentation exceeds thresholds.

The candidate_file_ids are file IDs that have dead/total ratio above fragmentation_threshold AND dead bytes above dead_bytes_threshold.

notify_rotation(shard_index, file_count)

@spec notify_rotation(non_neg_integer(), non_neg_integer()) :: :ok

Called by the Shard when it rotates to a new active file.

The file_count is the total number of log files (old + new active). This is the primary trigger for merge — no polling needed.

scheduler_name(index)

@spec scheduler_name(non_neg_integer()) :: atom()

Returns the registered process name for the scheduler at index.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a merge scheduler for the given shard.

Options

  • :shard_index (required) -- zero-based shard index
  • :data_dir (required) -- base directory for Bitcask data files
  • :merge_config -- override merge configuration (map)
  • :semaphore -- name or pid of the semaphore process (default: Semaphore)

status(index_or_server)

@spec status(non_neg_integer() | GenServer.server()) :: map()

Returns the current status of the merge scheduler for observability.

trigger_check(index_or_server)

@spec trigger_check(non_neg_integer() | GenServer.server()) :: :ok

Forces an immediate merge check, bypassing the event-driven trigger. Used in tests and for manual compaction via INFO/DEBUG commands.