Durable.Storage.Schemas.WaitGroup (Durable v0.1.0-rc)

View Source

Ecto schema for wait group records.

Wait groups track multiple events for wait_for_any and wait_for_all patterns.

Summary

Functions

Creates a changeset for adding a received event to the group.

Locks the wait group row FOR UPDATE, merges the event into received_events, and (when the wait condition is satisfied) flips status to :completed. Must be called inside a transaction.

Creates a changeset for cancelling a wait group.

Creates a changeset for inserting a new wait group.

Creates a changeset for timing out a wait group.

Types

status()

@type status() :: :pending | :completed | :timeout | :cancelled

t()

@type t() :: %Durable.Storage.Schemas.WaitGroup{
  __meta__: term(),
  completed_at: DateTime.t() | nil,
  event_names: [String.t()],
  foreach_id: integer() | nil,
  foreach_index: integer() | nil,
  id: Ecto.UUID.t(),
  inserted_at: DateTime.t(),
  parallel_id: integer() | nil,
  received_events: map(),
  status: status(),
  step_name: String.t(),
  timeout_at: DateTime.t() | nil,
  timeout_value: term() | nil,
  updated_at: DateTime.t(),
  wait_type: wait_type(),
  workflow: term(),
  workflow_id: Ecto.UUID.t()
}

wait_type()

@type wait_type() :: :any | :all

Functions

add_event_changeset(wait_group, event_name, payload)

Creates a changeset for adding a received event to the group.

add_event_locked(repo, wait_group_id, event_name, payload)

Locks the wait group row FOR UPDATE, merges the event into received_events, and (when the wait condition is satisfied) flips status to :completed. Must be called inside a transaction.

Returns {:ok, %{wait_group: updated, just_completed: boolean}} on success — just_completed is true iff this call transitioned the group from :pending to :completed. Already-completed/timed-out groups are treated as a no-op (just_completed: false) so late arrivals don't double-resume the parent.

Without the row lock, two concurrent callers can read the same received_events, each merge in only their own event, and have the later UPDATE silently overwrite the earlier one — leaving the group permanently short an entry and the parent stuck in :waiting.

cancel_changeset(wait_group)

Creates a changeset for cancelling a wait group.

changeset(wait_group, attrs)

Creates a changeset for inserting a new wait group.

timeout_changeset(wait_group)

Creates a changeset for timing out a wait group.