PropertyDamage.EventQueue (PropertyDamage v0.2.0)

View Source

Shared event queue for injector adapters.

The EventQueue is an Agent-based queue that collects events pushed by injector adapters (webhooks, callbacks, etc.) and makes them available to the executor for processing.

Usage Flow

  1. Framework starts the queue via start_link/0
  2. Injector adapters receive the queue reference in their config
  3. When events arrive (webhooks, callbacks), adapters push them via push/3
  4. After each command execution, the executor drains pending events via drain/1
  5. Framework stops the queue via stop/1 after the run completes

Event Entries

Each entry in the queue contains:

  • event - The event struct
  • adapter_module - The injector adapter that received it
  • timestamp - Monotonic time when the event was pushed

Example

# In test setup
{:ok, queue} = EventQueue.start_link()

# In injector adapter callback
EventQueue.push(queue, __MODULE__, %PaymentConfirmed{...})

# In executor loop
pending_events = EventQueue.drain(queue)

# In test teardown
EventQueue.stop(queue)

Summary

Types

Event entry with metadata.

Functions

Drain all pending events.

Check if the queue is empty.

Peek at pending events without removing them.

Push an event from an injector adapter.

Get the number of pending events.

Start a new event queue.

Start a new event queue with options.

Stop the event queue.

Types

entry()

@type entry() :: %{event: struct(), adapter_module: module(), timestamp: integer()}

Event entry with metadata.

Functions

drain(queue)

@spec drain(pid()) :: [entry()]

Drain all pending events.

Returns the list of events and clears the queue. Events are returned in the order they were pushed.

Returns

List of event entries, each containing :event, :adapter_module, and :timestamp.

Example

entries = EventQueue.drain(queue)
Enum.each(entries, fn %{event: event, adapter_module: adapter} ->
  IO.puts("Event from #{adapter}: #{inspect(event)}")
end)

empty?(queue)

@spec empty?(pid()) :: boolean()

Check if the queue is empty.

Example

if EventQueue.empty?(queue) do
  IO.puts("No pending events")
end

peek(queue)

@spec peek(pid()) :: [entry()]

Peek at pending events without removing them.

Useful for debugging or when you need to check without consuming.

Example

count = queue |> EventQueue.peek() |> length()

push(queue, adapter_module, event)

@spec push(pid(), module(), struct()) :: :ok

Push an event from an injector adapter.

Events are timestamped automatically with monotonic time.

Parameters

  • queue - The event queue pid
  • adapter_module - The injector adapter module pushing the event
  • event - The event struct

Example

EventQueue.push(queue, MyInjectorAdapter, %PaymentConfirmed{order_id: "123"})

push_from_poller(queue, poller_id, command_index, event, branch_id \\ nil)

@spec push_from_poller(
  pid(),
  reference(),
  non_neg_integer(),
  struct(),
  non_neg_integer() | nil
) :: :ok

Push an event from a resource poller.

Events are timestamped automatically with monotonic time and tagged with their source as :resource_poller.

Parameters

  • queue - The event queue pid
  • poller_id - Reference identifying the poller instance
  • command_index - Index of the command that started the poller
  • event - The event struct
  • branch_id - Optional branch identifier for parallel execution

Example

EventQueue.push_from_poller(queue, poller_ref, 5, %StatusChanged{id: "123"}, nil)

size(queue)

@spec size(pid()) :: non_neg_integer()

Get the number of pending events.

Example

count = EventQueue.size(queue)

start_link()

@spec start_link() :: {:ok, pid()} | {:error, term()}

Start a new event queue.

Returns

  • {:ok, queue} - Queue started successfully
  • {:error, reason} - Failed to start

Example

{:ok, queue} = EventQueue.start_link()

start_link(opts)

@spec start_link(keyword()) :: {:ok, pid()} | {:error, term()}

Start a new event queue with options.

Options

Accepts standard Agent options like :name.

Example

{:ok, queue} = EventQueue.start_link(name: :my_event_queue)

stop(queue)

@spec stop(pid()) :: :ok

Stop the event queue.

Example

:ok = EventQueue.stop(queue)