PropertyDamage.EventQueue (PropertyDamage v0.2.0)
View SourceShared event queue for injector adapters.
The EventQueue is an Agent-based queue that collects events pushed by injector adapters (webhooks, callbacks, etc.) and makes them available to the executor for processing.
Usage Flow
- Framework starts the queue via
start_link/0 - Injector adapters receive the queue reference in their config
- When events arrive (webhooks, callbacks), adapters push them via
push/3 - After each command execution, the executor drains pending events via
drain/1 - Framework stops the queue via
stop/1after the run completes
Event Entries
Each entry in the queue contains:
event- The event structadapter_module- The injector adapter that received ittimestamp- Monotonic time when the event was pushed
Example
# In test setup
{:ok, queue} = EventQueue.start_link()
# In injector adapter callback
EventQueue.push(queue, __MODULE__, %PaymentConfirmed{...})
# In executor loop
pending_events = EventQueue.drain(queue)
# In test teardown
EventQueue.stop(queue)
Summary
Functions
Drain all pending events.
Check if the queue is empty.
Peek at pending events without removing them.
Push an event from an injector adapter.
Push an event from a resource poller.
Get the number of pending events.
Start a new event queue.
Start a new event queue with options.
Stop the event queue.
Types
Functions
Drain all pending events.
Returns the list of events and clears the queue. Events are returned in the order they were pushed.
Returns
List of event entries, each containing :event, :adapter_module,
and :timestamp.
Example
entries = EventQueue.drain(queue)
Enum.each(entries, fn %{event: event, adapter_module: adapter} ->
IO.puts("Event from #{adapter}: #{inspect(event)}")
end)
Check if the queue is empty.
Example
if EventQueue.empty?(queue) do
IO.puts("No pending events")
end
Peek at pending events without removing them.
Useful for debugging or when you need to check without consuming.
Example
count = queue |> EventQueue.peek() |> length()
Push an event from an injector adapter.
Events are timestamped automatically with monotonic time.
Parameters
queue- The event queue pidadapter_module- The injector adapter module pushing the eventevent- The event struct
Example
EventQueue.push(queue, MyInjectorAdapter, %PaymentConfirmed{order_id: "123"})
@spec push_from_poller( pid(), reference(), non_neg_integer(), struct(), non_neg_integer() | nil ) :: :ok
Push an event from a resource poller.
Events are timestamped automatically with monotonic time and tagged
with their source as :resource_poller.
Parameters
queue- The event queue pidpoller_id- Reference identifying the poller instancecommand_index- Index of the command that started the pollerevent- The event structbranch_id- Optional branch identifier for parallel execution
Example
EventQueue.push_from_poller(queue, poller_ref, 5, %StatusChanged{id: "123"}, nil)
@spec size(pid()) :: non_neg_integer()
Get the number of pending events.
Example
count = EventQueue.size(queue)
Start a new event queue.
Returns
{:ok, queue}- Queue started successfully{:error, reason}- Failed to start
Example
{:ok, queue} = EventQueue.start_link()
Start a new event queue with options.
Options
Accepts standard Agent options like :name.
Example
{:ok, queue} = EventQueue.start_link(name: :my_event_queue)
@spec stop(pid()) :: :ok
Stop the event queue.
Example
:ok = EventQueue.stop(queue)