PropertyDamage.ResourcePoller (PropertyDamage v0.2.0)

View Source

GenServer-based poller for external resources during command execution.

ResourcePoller enables adapters to start background polling of external resources during execute/2, injecting events as the resource status changes. This allows commands to return immediately with an initial event while a poller monitors the resource for subsequent state changes.

Usage Pattern

In an adapter's execute/2, use ctx.start_poller.(opts) to spawn a poller:

def execute(%CreateAuthorization{} = cmd, ctx) do
  {:ok, %{body: %{"id" => id}}} = Req.post(ctx.client, ...)

  _poller = ctx.start_poller.(
    poll_fn: fn -> Req.get(ctx.client, url: "/authorizations/#{id}") end,
    interval_ms: 500,
    timeout_ms: 30_000,
    handler: fn response ->
      case response.body["status"] do
        "processing" -> :continue
        "pending_review" -> {:inject, %AuthorizationPendingReview{id: id}}
        "approved" -> {:done, %AuthorizationApproved{id: id}}
        "declined" -> {:done, %AuthorizationDeclined{id: id}}
      end
    end
  )

  {:ok, [%AuthorizationCreated{id: id, status: :processing}]}
end

Handler Return Values

ReturnBehavior
:continueKeep polling, no event
{:inject, event}Push event to EventQueue, keep polling
{:inject, [events]}Push multiple events, keep polling
{:done, event}Push event, stop polling
{:done, [events]}Push multiple events, stop (use {:done, []} for no event)
{:error, reason}Stop polling with error

The reason in {:error, reason} can be any term, including an exception struct for structured error reporting:

# Simple string error
{:error, "Payment gateway timeout"}

# Exception for structured errors
{:error, %PaymentError{code: :gateway_timeout, details: response}}

When an exception is used, the framework will use Exception.message/1 for cleaner log output in :log assertion mode.

Timeout Handling

The on_timeout option controls behavior when polling exceeds timeout_ms:

ValueBehavior
:ignoreSilent timeout, poller stops, no failure
:fail (default)Generic timeout error reported
{:error, reason}Custom error reason reported
fn info -> resultFunction called with timeout info

The timeout info passed to the function:

%{
  elapsed_ms: non_neg_integer(),     # Total time elapsed
  poll_count: non_neg_integer(),     # Number of poll attempts
  last_poll_result: term() | nil     # Result of last poll_fn call
}

Like handler errors, on_timeout can return {:error, exception} for structured error reporting:

on_timeout: fn info ->
  {:error, %ResourceTimeout{
    resource: "authorization",
    elapsed_ms: info.elapsed_ms,
    poll_count: info.poll_count
  }}
end

Lifecycle

  1. Adapter calls ctx.start_poller.(opts) during execute/2
  2. Poller spawns, begins polling immediately
  3. Handler processes each poll result, may inject events
  4. At sequence end, executor awaits all active pollers
  5. Errors handled based on assertion_mode

Error Handling

Errors are captured with stacktraces where applicable:

  • poll_fn raises → {:error, {:poll_fn_error, exception, stacktrace}}
  • handler raises → {:error, {:handler_error, exception, stacktrace}}
  • on_timeout fn raises → {:error, {:on_timeout_error, exception, stacktrace}}
  • handler returns {:error, reason}{:error, reason} (no stacktrace)
  • on_timeout returns {:error, reason}{:error, reason} (no stacktrace)
  • Timeout with :fail{:error, {:timeout, timeout_info}}

In :log assertion mode, exceptions are formatted using Exception.message/1 for cleaner output. In :halt and :record modes, full error details including stacktraces (when available) are preserved in the failure report.

Summary

Types

Handler return values.

Timeout handling options.

Result from a completed poller.

Options for starting a resource poller.

t()

Poller handle returned by start/1.

Information passed to on_timeout functions when polling exceeds timeout_ms.

Functions

Wait for a poller to complete.

Wait for multiple pollers to complete.

Check if a poller has completed without blocking.

Returns a specification to start this module under a supervisor.

Start polling a resource.

Stop a poller early.

Types

handler_result()

@type handler_result() ::
  :continue
  | {:inject, struct() | [struct()]}
  | {:done, struct() | [struct()]}
  | {:error, term() | Exception.t()}

Handler return values.

The {:error, reason} variant accepts any term, including exception structs for structured error reporting. Exceptions will be formatted using Exception.message/1 in :log mode.

on_timeout_option()

@type on_timeout_option() ::
  :ignore
  | :fail
  | {:error, term() | Exception.t()}
  | (timeout_info() -> :ignore | {:error, term() | Exception.t()})

Timeout handling options.

The {:error, reason} variant accepts any term, including exception structs. The function variant receives timeout_info() and should return :ignore or {:error, reason}. If the function raises, the exception is captured with its stacktrace.

result()

@type result() ::
  {:success, reference()}
  | {:timeout_ignored, reference()}
  | {:error, reference(), term()}

Result from a completed poller.

Error reasons can be:

  • {:poll_fn_error, exception, stacktrace} - poll_fn raised
  • {:handler_error, exception, stacktrace} - handler raised
  • {:on_timeout_error, exception, stacktrace} - on_timeout function raised
  • {:timeout, timeout_info} - timeout with on_timeout: :fail
  • Any term from {:error, reason} returned by handler or on_timeout

start_opts()

@type start_opts() :: [
  poll_fn: (-> term()),
  handler: (term() -> handler_result()),
  interval_ms: pos_integer(),
  timeout_ms: pos_integer(),
  event_queue: pid(),
  command_index: non_neg_integer(),
  on_timeout: on_timeout_option(),
  branch_id: non_neg_integer() | nil
]

Options for starting a resource poller.

Required:

  • :poll_fn - Function (() -> response) to poll the resource
  • :handler - Function (response -> result) to process poll results
  • :interval_ms - Milliseconds between polls
  • :timeout_ms - Maximum polling duration
  • :event_queue - EventQueue pid for injecting events
  • :command_index - Index of the command that started this poller

Optional:

  • :on_timeout - Timeout handling: :ignore, :fail, {:error, reason}, or function
  • :branch_id - Branch identifier for parallel execution

t()

@type t() :: %PropertyDamage.ResourcePoller{
  caller: pid(),
  command_index: non_neg_integer(),
  id: reference(),
  pid: pid(),
  started_at: integer(),
  timeout_ms: pos_integer()
}

Poller handle returned by start/1.

timeout_info()

@type timeout_info() :: %{
  elapsed_ms: non_neg_integer(),
  poll_count: non_neg_integer(),
  last_poll_result: term() | nil
}

Information passed to on_timeout functions when polling exceeds timeout_ms.

This map provides context about what happened during polling, allowing the timeout handler to make informed decisions about how to respond.

Fields

  • :elapsed_ms - Total wall-clock time in milliseconds since the poller started. This will be >= timeout_ms from the start options.

  • :poll_count - Number of times poll_fn was called. A count of 0 means the poller timed out before the first poll could complete (unlikely but possible if timeout_ms < interval_ms).

  • :last_poll_result - The return value from the most recent poll_fn call, or nil if no polls completed. Useful for logging or including in error messages to show what state the resource was in when it timed out.

Example Usage

on_timeout: fn info ->
  if info.poll_count > 10 do
    # We tried hard enough, treat as success
    :ignore
  else
    {:error, %ResourceTimeout{
      elapsed_ms: info.elapsed_ms,
      poll_count: info.poll_count,
      last_status: info.last_poll_result[:status]
    }}
  end
end

Functions

await(poller, opts \\ [])

@spec await(
  t(),
  keyword()
) :: result() | {:error, :await_timeout}

Wait for a poller to complete.

Blocks until the poller finishes via {:done, _}, times out, or errors.

Options

  • :timeout - Maximum time to wait in milliseconds (default: poller's timeout_ms + 1000)

Returns

  • {:success, id} - Poller completed via {:done, _}
  • {:timeout_ignored, id} - Timeout occurred, on_timeout returned :ignore
  • {:error, id, reason} - Error from handler, poll_fn, or timeout
  • {:error, :await_timeout} - Await itself timed out

await_all(pollers, opts \\ [])

@spec await_all(
  [t()],
  keyword()
) :: [{reference(), result()}]

Wait for multiple pollers to complete.

Returns when all pollers have completed.

Options

  • :timeout - Maximum time to wait for all pollers (default: max of all poller timeouts + 1000)

Returns

List of {poller_id, result} tuples.

check(resource_poller)

@spec check(t()) :: {:ok, result()} | :pending

Check if a poller has completed without blocking.

Returns

  • {:ok, result} - Poller has completed
  • :pending - Poller is still running

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start(opts)

@spec start(start_opts()) :: t()

Start polling a resource.

Spawns a polling process that periodically calls poll_fn, passes the result to handler, and injects events to the EventQueue based on handler return.

Returns a handle that can be used with await/2 to wait for completion.

Options

See start_opts/0 for the full list of options.

Example

poller = ResourcePoller.start(
  poll_fn: fn -> Req.get(client, url: "/resource/#{id}") end,
  handler: fn resp -> ... end,
  interval_ms: 500,
  timeout_ms: 30_000,
  event_queue: queue,
  command_index: 5
)

stop(resource_poller)

@spec stop(t()) :: :ok

Stop a poller early.

The poller process will be terminated and no result message will be sent. Safe to call multiple times or on already-stopped pollers.