GenServer-based async event dispatcher for the agent system.
The AsyncDispatcher manages event routing between agents in an asynchronous,
non-blocking manner. It maintains an event queue (FIFO) and processes events
by routing them through a Mojentic.Router to the appropriate agents.
Features
- Event Queue - FIFO queue using
:queuemodule - Async Processing - Non-blocking event handling via Task and GenServer
- Mixed Agent Support - Handles both sync and async agents
- Graceful Shutdown - Stop via
TerminateEventor explicitstop/1 - Queue Monitoring - Wait for empty queue with
wait_for_empty_queue/2
Architecture
┌─────────────┐
│ Dispatcher │
│ (GenServer) │
└──────┬──────┘
│ Event Queue
│ [:queue]
│
├─→ Router ─→ Agent1 ──→ [New Events]
│ ↓
└─→ Router ─→ Agent2 ──→ [New Events]State Structure
%{
router: %Router{},
event_queue: :queue.queue(),
processing: boolean(),
batch_size: integer()
}Usage
# Create router
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker)
|> Router.add_route(QuestionEvent, answer_generator)
# Start dispatcher
{:ok, pid} = AsyncDispatcher.start_link(router: router)
# Dispatch events
event = %QuestionEvent{source: MyApp, question: "What is Elixir?"}
AsyncDispatcher.dispatch(pid, event)
# Wait for queue to empty
:ok = AsyncDispatcher.wait_for_empty_queue(pid, timeout: 10_000)
# Stop dispatcher
AsyncDispatcher.stop(pid)Examples
# Full workflow
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker_pid)
|> Router.add_route(FactCheckEvent, aggregator_pid)
{:ok, dispatcher} = AsyncDispatcher.start_link(
router: router,
batch_size: 10
)
question = %QuestionEvent{
source: MyApp,
question: "What is the capital of France?"
}
AsyncDispatcher.dispatch(dispatcher, question)
AsyncDispatcher.wait_for_empty_queue(dispatcher)
AsyncDispatcher.stop(dispatcher)Concurrency Model
Agents matching an event are dispatched concurrently via Tasks. This is intentional and is safe in Elixir because processes do not share mutable memory. The other mojentic ports (ts/py/ru) serialize agent dispatch precisely because they DO share mutable state (SharedWorkingMemory etc.) where concurrent agents would race.
Summary
Functions
Returns a specification to start this module under a supervisor.
Dispatches an event to the event queue.
Gets the current size of the event queue.
Starts the async dispatcher as a linked process.
Stops the dispatcher gracefully.
Waits for the event queue to be empty.
Types
@type state() :: %{ router: Mojentic.Router.t(), event_queue: :queue.queue(), processing: boolean(), batch_size: non_neg_integer(), pending_tasks: non_neg_integer() }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Dispatches an event to the event queue.
The event will be assigned a correlation_id if it doesn't have one. Events are processed in FIFO order by the dispatcher loop.
Parameters
pid- The dispatcher processevent- The event to dispatch
Examples
event = %QuestionEvent{source: MyApp, question: "Hello?"}
AsyncDispatcher.dispatch(dispatcher, event)
Gets the current size of the event queue.
Examples
size = AsyncDispatcher.get_queue_size(dispatcher)
Starts the async dispatcher as a linked process.
Options
:router- Router instance for event routing (required):batch_size- Number of events to process per batch (default: 5):name- Process registration name (optional)
Examples
{:ok, pid} = AsyncDispatcher.start_link(router: router)
{:ok, pid} = AsyncDispatcher.start_link(
router: router,
batch_size: 10,
name: MyDispatcher
)
Stops the dispatcher gracefully.
Waits for the current batch to complete before shutting down.
Parameters
pid- The dispatcher processtimeout- Maximum time to wait for shutdown (default: 5000ms)
Examples
AsyncDispatcher.stop(dispatcher)
AsyncDispatcher.stop(dispatcher, 10_000)
Waits for the event queue to be empty.
This is useful for testing or ensuring all events have been processed before continuing.
Parameters
pid- The dispatcher processopts- Keyword list with::timeout- Maximum wait time in milliseconds (default: 5000)
Returns
:ok- Queue is empty{:error, :timeout}- Timeout reached with events still in queue
Examples
:ok = AsyncDispatcher.wait_for_empty_queue(dispatcher)
case AsyncDispatcher.wait_for_empty_queue(dispatcher, timeout: 10_000) do
:ok -> IO.puts("All events processed")
{:error, :timeout} -> IO.puts("Timed out waiting")
end