Durable.Wait (Durable v0.1.0-rc)

View Source

Wait primitives for durable workflows.

Provides functions for:

  • Pausing workflow execution for a duration or until a specific time
  • Waiting for external events
  • Waiting for human input (human-in-the-loop)

Usage

defmodule MyApp.ApprovalWorkflow do
  use Durable
  use Durable.Context
  use Durable.Wait

  workflow "approval" do
    step :request_approval do
      result = wait_for_approval("manager_approval",
        prompt: "Approve expense?",
        timeout: days(3),
        timeout_value: :auto_reject
      )
      put_context(:approval, result)
    end
  end
end

Primitives

Summary

Functions

Injects wait functions into the calling module.

Cancels a waiting workflow.

Returns the end of the current day.

Lists pending events for workflows.

Lists pending inputs for workflows.

Returns the next business day (Mon-Fri) at the specified hour.

Returns the next occurrence of the specified weekday.

Provides input for a waiting workflow.

Pauses the workflow until the specified datetime.

Sends an event to a waiting workflow.

Pauses the workflow for the specified duration.

Waits for all of the specified events.

Waits for any of the specified events.

Waits for an approval decision.

Waits for a single choice selection.

Waits for an external event.

Waits for form submission.

Functions

__using__(opts)

(macro)

Injects wait functions into the calling module.

cancel_wait(workflow_id, opts \\ [])

@spec cancel_wait(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Cancels a waiting workflow.

The workflow will resume with {:cancelled, reason}.

Options

  • :reason - Cancellation reason (default: "cancelled")
  • :durable - The Durable instance name (default: Durable)

Examples

Durable.cancel_wait(workflow_id, reason: "User cancelled")

end_of_day(opts \\ [])

@spec end_of_day(keyword()) :: DateTime.t()

Returns the end of the current day.

Options

  • :timezone - Timezone string, default: "UTC"

Examples

end_of_day()
# => ~U[2026-01-03 23:59:59Z]

list_pending_events(filters \\ [])

@spec list_pending_events(keyword()) :: [map()]

Lists pending events for workflows.

Filters

  • :event_name - Filter by event name
  • :status - Filter by status (default: :pending)
  • :limit - Maximum number of results (default: 50)
  • :durable - The Durable instance name (default: Durable)

list_pending_inputs(filters \\ [])

@spec list_pending_inputs(keyword()) :: [map()]

Lists pending inputs for workflows.

Filters

  • :status - Filter by status (default: :pending)
  • :timeout_before - Filter inputs timing out before this datetime
  • :limit - Maximum number of results (default: 50)
  • :durable - The Durable instance name (default: Durable)

next_business_day(opts \\ [])

@spec next_business_day(keyword()) :: DateTime.t()

Returns the next business day (Mon-Fri) at the specified hour.

Options

  • :hour - Hour of day (0-23), default: 9
  • :timezone - Timezone string, default: "UTC"

Examples

next_business_day()
# => ~U[2026-01-05 09:00:00Z] (next Mon-Fri at 9am UTC)

next_business_day(hour: 17)
# => ~U[2026-01-05 17:00:00Z]

next_weekday(weekday, opts \\ [])

@spec next_weekday(
  atom(),
  keyword()
) :: DateTime.t()

Returns the next occurrence of the specified weekday.

Options

  • :hour - Hour of day (0-23), default: 9
  • :timezone - Timezone string, default: "UTC"

Examples

next_weekday(:monday)
# => ~U[2026-01-06 09:00:00Z]

next_weekday(:friday, hour: 17)
# => ~U[2026-01-10 17:00:00Z]

provide_input(workflow_id, input_name, data, opts \\ [])

@spec provide_input(String.t(), String.t(), map(), keyword()) ::
  :ok | {:error, term()}

Provides input for a waiting workflow.

Called from external code (API, UI, etc.) to continue a workflow.

Options

  • :durable - The Durable instance name (default: Durable)

Examples

Durable.provide_input(workflow_id, "manager_approval", %{approved: true})

schedule_at(datetime)

@spec schedule_at(DateTime.t()) :: nil

Pauses the workflow until the specified datetime.

Examples

schedule_at(~U[2026-01-10 09:00:00Z])
schedule_at(next_business_day(hour: 9))
schedule_at(next_weekday(:monday, hour: 9))

See sleep/1 for resumption semantics — they are identical.

send_event(workflow_id, event_name, payload, opts \\ [])

@spec send_event(String.t(), String.t(), map(), keyword()) :: :ok | {:error, term()}

Sends an event to a waiting workflow.

Options

  • :durable - The Durable instance name (default: Durable)

Examples

Durable.send_event(workflow_id, "payment_confirmed", %{amount: 99.99})

sleep(duration_ms)

@spec sleep(integer()) :: nil

Pauses the workflow for the specified duration.

The workflow will be suspended and resumed after the duration elapses.

Examples

sleep(seconds(30))
sleep(minutes(5))
sleep(hours(2))
sleep(days(1))

Resumption semantics

Like wait_for_event/2, sleep/1 is a resumption barrier: when the step re-runs after the wake fires, the SleepWaker has merged a :__sleep_satisfied__ marker into context that this call recognises and returns nil for instead of re-throwing. Any side effects before sleep/1 therefore run twice (once on suspend, again on resume) — keep them idempotent or move them into a prior step.

Multiple sleep/1 calls inside one step body are not supported: the marker is per-step, not per-call, so the second call on resume would return immediately rather than wait. Use one wait per step.

wait_for_all(event_names, opts \\ [])

@spec wait_for_all(
  [String.t()],
  keyword()
) :: map()

Waits for all of the specified events.

Returns a map of %{event_name => payload} when all events are received.

Options

  • :timeout - Timeout in milliseconds (optional)
  • :timeout_value - Value to return on timeout (optional)

Examples

results = wait_for_all(["manager_approval", "legal_approval"])
# => %{"manager_approval" => %{...}, "legal_approval" => %{...}}

results = wait_for_all(["step1_complete", "step2_complete"],
  timeout: days(7),
  timeout_value: {:timeout, :partial}
)

wait_for_any(event_names, opts \\ [])

@spec wait_for_any(
  [String.t()],
  keyword()
) :: {String.t(), term()}

Waits for any of the specified events.

Returns {event_name, payload} when any event is received.

Options

  • :timeout - Timeout in milliseconds (optional)
  • :timeout_value - Value to return on timeout (optional)

Examples

{event, payload} = wait_for_any(["success", "failure", "cancelled"])

{event, payload} = wait_for_any(["approved", "rejected"],
  timeout: hours(24),
  timeout_value: {:timeout, nil}
)

wait_for_approval(input_name, opts \\ [])

@spec wait_for_approval(
  String.t(),
  keyword()
) :: :approved | :rejected | term()

Waits for an approval decision.

Returns :approved, :rejected, or timeout_value if timeout occurs.

Options

  • :prompt - Prompt to show to the user
  • :metadata - Additional data for the UI to display
  • :timeout - Timeout in milliseconds
  • :timeout_value - Value to return on timeout

Examples

result = wait_for_approval("manager_approval")
# => :approved | :rejected

result = wait_for_approval("expense_approval",
  prompt: "Approve expense for $500?",
  metadata: %{employee: "John", amount: 500},
  timeout: days(3),
  timeout_value: :auto_approved
)

wait_for_choice(input_name, opts)

@spec wait_for_choice(
  String.t(),
  keyword()
) :: term()

Waits for a single choice selection.

Returns the selected choice value.

Options

  • :choices - List of choice options (required)
  • :prompt - Prompt to show to the user
  • :metadata - Additional data for the UI to display
  • :timeout - Timeout in milliseconds
  • :timeout_value - Value to return on timeout (often a default choice)

Examples

result = wait_for_choice("shipping_method",
  prompt: "Select shipping method:",
  choices: [
    %{value: :express, label: "Express ($15)"},
    %{value: :standard, label: "Standard (Free)"}
  ],
  timeout: hours(24),
  timeout_value: :standard
)
# => :express | :standard

wait_for_event(event_name, opts \\ [])

@spec wait_for_event(
  String.t(),
  keyword()
) :: term()

Waits for an external event.

The workflow will be suspended until the event is received or timeout occurs.

Options

  • :timeout - Timeout in milliseconds (optional)
  • :timeout_value - Value to return on timeout (optional)

Resumption semantics

wait_for_event/2 is a resumption barrier, not a pause inside the step body. When the event arrives, the step body re-executes from the top — it doesn't "continue" from the line where the wait was called. On re-execution, the wait finds the event data in the restored context and returns immediately without re-suspending.

This means any side effects BEFORE wait_for_event/2 run each time the step is invoked (once on suspend, again on resume). Make them idempotent or move them into a separate prior step:

# 🚫 not idempotent — sends two emails
step :wait_for_approval, fn data ->
  Mailer.send_approval_request(data)      # runs on both passes
  result = wait_for_event("approved")
  {:ok, assign(data, :approved, result)}
end

# ✅ side effect lives in its own step
step :request_approval, fn data -> Mailer.send_approval_request(data); {:ok, data} end
step :await_approval,   fn data -> {:ok, assign(data, :approved, wait_for_event("approved"))} end

Examples

# Wait indefinitely
result = wait_for_event("payment_confirmed")

# With timeout
result = wait_for_event("payment_confirmed",
  timeout: hours(2),
  timeout_value: {:error, :payment_timeout}
)

wait_for_form(input_name, opts)

@spec wait_for_form(
  String.t(),
  keyword()
) :: map() | term()

Waits for form submission.

Returns a map of field values.

Options

  • :fields - List of field definitions (required)
  • :prompt - Prompt to show to the user
  • :metadata - Additional data for the UI to display
  • :timeout - Timeout in milliseconds
  • :timeout_value - Value to return on timeout

Examples

result = wait_for_form("equipment_request",
  prompt: "Select your equipment preferences:",
  fields: [
    %{name: :laptop, type: :select, options: ["MacBook Pro", "ThinkPad X1"], required: true},
    %{name: :monitor, type: :select, options: ["27-inch", "32-inch"], required: false},
    %{name: :notes, type: :text, required: false}
  ],
  timeout: days(7)
)
# => %{laptop: "MacBook Pro", monitor: "27-inch", notes: "..."}

wait_for_input(input_name, opts \\ [])

@spec wait_for_input(
  String.t(),
  keyword()
) :: term()

Waits for human input.

The workflow will be suspended until input is provided or timeout occurs.

Options

  • :type - Input type: :approval, :choice, :text, :form (default: :free_text)
  • :prompt - Prompt to show to the user
  • :metadata - Additional data for the UI to display
  • :fields - Field definitions for form input
  • :choices - Choice options for choice input
  • :timeout - Timeout in milliseconds
  • :timeout_value - Value to return on timeout
  • :on_timeout - :resume (default) or :fail

Examples

# Simple input
result = wait_for_input("feedback")

# With timeout
result = wait_for_input("manager_approval",
  type: :approval,
  prompt: "Approve expense?",
  metadata: %{amount: 150.00},
  timeout: days(3),
  timeout_value: :auto_approved
)

wait_for_text(input_name, opts \\ [])

@spec wait_for_text(
  String.t(),
  keyword()
) :: String.t() | term()

Waits for text input.

Returns the entered text string.

Options

  • :prompt - Prompt to show to the user
  • :metadata - Additional data for the UI to display
  • :timeout - Timeout in milliseconds
  • :timeout_value - Value to return on timeout

Examples

result = wait_for_text("rejection_reason",
  prompt: "Please provide a reason for rejection:",
  timeout: hours(4),
  timeout_value: "No reason provided"
)
# => "Budget exceeded for this quarter"