PropertyDamage.ResourcePoller (PropertyDamage v0.2.0)
View SourceGenServer-based poller for external resources during command execution.
ResourcePoller enables adapters to start background polling of external resources
during execute/2, injecting events as the resource status changes. This allows
commands to return immediately with an initial event while a poller monitors the
resource for subsequent state changes.
Usage Pattern
In an adapter's execute/2, use ctx.start_poller.(opts) to spawn a poller:
def execute(%CreateAuthorization{} = cmd, ctx) do
{:ok, %{body: %{"id" => id}}} = Req.post(ctx.client, ...)
_poller = ctx.start_poller.(
poll_fn: fn -> Req.get(ctx.client, url: "/authorizations/#{id}") end,
interval_ms: 500,
timeout_ms: 30_000,
handler: fn response ->
case response.body["status"] do
"processing" -> :continue
"pending_review" -> {:inject, %AuthorizationPendingReview{id: id}}
"approved" -> {:done, %AuthorizationApproved{id: id}}
"declined" -> {:done, %AuthorizationDeclined{id: id}}
end
end
)
{:ok, [%AuthorizationCreated{id: id, status: :processing}]}
endHandler Return Values
| Return | Behavior |
|---|---|
:continue | Keep polling, no event |
{:inject, event} | Push event to EventQueue, keep polling |
{:inject, [events]} | Push multiple events, keep polling |
{:done, event} | Push event, stop polling |
{:done, [events]} | Push multiple events, stop (use {:done, []} for no event) |
{:error, reason} | Stop polling with error |
The reason in {:error, reason} can be any term, including an exception struct
for structured error reporting:
# Simple string error
{:error, "Payment gateway timeout"}
# Exception for structured errors
{:error, %PaymentError{code: :gateway_timeout, details: response}}When an exception is used, the framework will use Exception.message/1 for
cleaner log output in :log assertion mode.
Timeout Handling
The on_timeout option controls behavior when polling exceeds timeout_ms:
| Value | Behavior |
|---|---|
:ignore | Silent timeout, poller stops, no failure |
:fail (default) | Generic timeout error reported |
{:error, reason} | Custom error reason reported |
fn info -> result | Function called with timeout info |
The timeout info passed to the function:
%{
elapsed_ms: non_neg_integer(), # Total time elapsed
poll_count: non_neg_integer(), # Number of poll attempts
last_poll_result: term() | nil # Result of last poll_fn call
}Like handler errors, on_timeout can return {:error, exception} for structured
error reporting:
on_timeout: fn info ->
{:error, %ResourceTimeout{
resource: "authorization",
elapsed_ms: info.elapsed_ms,
poll_count: info.poll_count
}}
endLifecycle
- Adapter calls
ctx.start_poller.(opts)duringexecute/2 - Poller spawns, begins polling immediately
- Handler processes each poll result, may inject events
- At sequence end, executor awaits all active pollers
- Errors handled based on
assertion_mode
Error Handling
Errors are captured with stacktraces where applicable:
poll_fnraises →{:error, {:poll_fn_error, exception, stacktrace}}handlerraises →{:error, {:handler_error, exception, stacktrace}}on_timeoutfn raises →{:error, {:on_timeout_error, exception, stacktrace}}handlerreturns{:error, reason}→{:error, reason}(no stacktrace)on_timeoutreturns{:error, reason}→{:error, reason}(no stacktrace)- Timeout with
:fail→{:error, {:timeout, timeout_info}}
In :log assertion mode, exceptions are formatted using Exception.message/1
for cleaner output. In :halt and :record modes, full error details including
stacktraces (when available) are preserved in the failure report.
Summary
Types
Handler return values.
Timeout handling options.
Result from a completed poller.
Options for starting a resource poller.
Poller handle returned by start/1.
Information passed to on_timeout functions when polling exceeds timeout_ms.
Functions
Wait for a poller to complete.
Wait for multiple pollers to complete.
Check if a poller has completed without blocking.
Returns a specification to start this module under a supervisor.
Start polling a resource.
Stop a poller early.
Types
@type handler_result() :: :continue | {:inject, struct() | [struct()]} | {:done, struct() | [struct()]} | {:error, term() | Exception.t()}
Handler return values.
The {:error, reason} variant accepts any term, including exception structs
for structured error reporting. Exceptions will be formatted using
Exception.message/1 in :log mode.
@type on_timeout_option() :: :ignore | :fail | {:error, term() | Exception.t()} | (timeout_info() -> :ignore | {:error, term() | Exception.t()})
Timeout handling options.
The {:error, reason} variant accepts any term, including exception structs.
The function variant receives timeout_info() and should return :ignore
or {:error, reason}. If the function raises, the exception is captured
with its stacktrace.
@type result() :: {:success, reference()} | {:timeout_ignored, reference()} | {:error, reference(), term()}
Result from a completed poller.
Error reasons can be:
{:poll_fn_error, exception, stacktrace}- poll_fn raised{:handler_error, exception, stacktrace}- handler raised{:on_timeout_error, exception, stacktrace}- on_timeout function raised{:timeout, timeout_info}- timeout withon_timeout: :fail- Any term from
{:error, reason}returned by handler or on_timeout
@type start_opts() :: [ poll_fn: (-> term()), handler: (term() -> handler_result()), interval_ms: pos_integer(), timeout_ms: pos_integer(), event_queue: pid(), command_index: non_neg_integer(), on_timeout: on_timeout_option(), branch_id: non_neg_integer() | nil ]
Options for starting a resource poller.
Required:
:poll_fn- Function(() -> response)to poll the resource:handler- Function(response -> result)to process poll results:interval_ms- Milliseconds between polls:timeout_ms- Maximum polling duration:event_queue- EventQueue pid for injecting events:command_index- Index of the command that started this poller
Optional:
:on_timeout- Timeout handling::ignore,:fail,{:error, reason}, or function:branch_id- Branch identifier for parallel execution
@type t() :: %PropertyDamage.ResourcePoller{ caller: pid(), command_index: non_neg_integer(), id: reference(), pid: pid(), started_at: integer(), timeout_ms: pos_integer() }
Poller handle returned by start/1.
@type timeout_info() :: %{ elapsed_ms: non_neg_integer(), poll_count: non_neg_integer(), last_poll_result: term() | nil }
Information passed to on_timeout functions when polling exceeds timeout_ms.
This map provides context about what happened during polling, allowing the timeout handler to make informed decisions about how to respond.
Fields
:elapsed_ms- Total wall-clock time in milliseconds since the poller started. This will be >=timeout_msfrom the start options.:poll_count- Number of timespoll_fnwas called. A count of 0 means the poller timed out before the first poll could complete (unlikely but possible iftimeout_ms<interval_ms).:last_poll_result- The return value from the most recentpoll_fncall, ornilif no polls completed. Useful for logging or including in error messages to show what state the resource was in when it timed out.
Example Usage
on_timeout: fn info ->
if info.poll_count > 10 do
# We tried hard enough, treat as success
:ignore
else
{:error, %ResourceTimeout{
elapsed_ms: info.elapsed_ms,
poll_count: info.poll_count,
last_status: info.last_poll_result[:status]
}}
end
end
Functions
Wait for a poller to complete.
Blocks until the poller finishes via {:done, _}, times out, or errors.
Options
:timeout- Maximum time to wait in milliseconds (default: poller's timeout_ms + 1000)
Returns
{:success, id}- Poller completed via{:done, _}{:timeout_ignored, id}- Timeout occurred, on_timeout returned:ignore{:error, id, reason}- Error from handler, poll_fn, or timeout{:error, :await_timeout}- Await itself timed out
Wait for multiple pollers to complete.
Returns when all pollers have completed.
Options
:timeout- Maximum time to wait for all pollers (default: max of all poller timeouts + 1000)
Returns
List of {poller_id, result} tuples.
Check if a poller has completed without blocking.
Returns
{:ok, result}- Poller has completed:pending- Poller is still running
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec start(start_opts()) :: t()
Start polling a resource.
Spawns a polling process that periodically calls poll_fn, passes the result
to handler, and injects events to the EventQueue based on handler return.
Returns a handle that can be used with await/2 to wait for completion.
Options
See start_opts/0 for the full list of options.
Example
poller = ResourcePoller.start(
poll_fn: fn -> Req.get(client, url: "/resource/#{id}") end,
handler: fn resp -> ... end,
interval_ms: 500,
timeout_ms: 30_000,
event_queue: queue,
command_index: 5
)
@spec stop(t()) :: :ok
Stop a poller early.
The poller process will be terminated and no result message will be sent. Safe to call multiple times or on already-stopped pollers.