Mojentic.AsyncDispatcher (Mojentic v1.5.0)

Copy Markdown View Source

GenServer-based async event dispatcher for the agent system.

The AsyncDispatcher manages event routing between agents in an asynchronous, non-blocking manner. It maintains an event queue (FIFO) and processes events by routing them through a Mojentic.Router to the appropriate agents.

Features

  • Event Queue - FIFO queue using :queue module
  • Async Processing - Non-blocking event handling via Task and GenServer
  • Mixed Agent Support - Handles both sync and async agents
  • Graceful Shutdown - Stop via TerminateEvent or explicit stop/1
  • Queue Monitoring - Wait for empty queue with wait_for_empty_queue/2

Architecture


  Dispatcher 
   (GenServer) 

        Event Queue
        [:queue]
       
        Router  Agent1  [New Events]
                   
        Router  Agent2  [New Events]

State Structure

%{
  router: %Router{},
  event_queue: :queue.queue(),
  processing: boolean(),
  batch_size: integer()
}

Usage

# Create router
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker)
|> Router.add_route(QuestionEvent, answer_generator)

# Start dispatcher
{:ok, pid} = AsyncDispatcher.start_link(router: router)

# Dispatch events
event = %QuestionEvent{source: MyApp, question: "What is Elixir?"}
AsyncDispatcher.dispatch(pid, event)

# Wait for queue to empty
:ok = AsyncDispatcher.wait_for_empty_queue(pid, timeout: 10_000)

# Stop dispatcher
AsyncDispatcher.stop(pid)

Examples

# Full workflow
router = Router.new()
|> Router.add_route(QuestionEvent, fact_checker_pid)
|> Router.add_route(FactCheckEvent, aggregator_pid)

{:ok, dispatcher} = AsyncDispatcher.start_link(
  router: router,
  batch_size: 10
)

question = %QuestionEvent{
  source: MyApp,
  question: "What is the capital of France?"
}

AsyncDispatcher.dispatch(dispatcher, question)
AsyncDispatcher.wait_for_empty_queue(dispatcher)
AsyncDispatcher.stop(dispatcher)

Concurrency Model

Agents matching an event are dispatched concurrently via Tasks. This is intentional and is safe in Elixir because processes do not share mutable memory. The other mojentic ports (ts/py/ru) serialize agent dispatch precisely because they DO share mutable state (SharedWorkingMemory etc.) where concurrent agents would race.

Summary

Functions

Returns a specification to start this module under a supervisor.

Dispatches an event to the event queue.

Gets the current size of the event queue.

Starts the async dispatcher as a linked process.

Stops the dispatcher gracefully.

Waits for the event queue to be empty.

Types

state()

@type state() :: %{
  router: Mojentic.Router.t(),
  event_queue: :queue.queue(),
  processing: boolean(),
  batch_size: non_neg_integer(),
  pending_tasks: non_neg_integer()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

dispatch(pid, event)

Dispatches an event to the event queue.

The event will be assigned a correlation_id if it doesn't have one. Events are processed in FIFO order by the dispatcher loop.

Parameters

  • pid - The dispatcher process
  • event - The event to dispatch

Examples

event = %QuestionEvent{source: MyApp, question: "Hello?"}
AsyncDispatcher.dispatch(dispatcher, event)

get_queue_size(pid)

Gets the current size of the event queue.

Examples

size = AsyncDispatcher.get_queue_size(dispatcher)

start_link(opts)

Starts the async dispatcher as a linked process.

Options

  • :router - Router instance for event routing (required)
  • :batch_size - Number of events to process per batch (default: 5)
  • :name - Process registration name (optional)

Examples

{:ok, pid} = AsyncDispatcher.start_link(router: router)

{:ok, pid} = AsyncDispatcher.start_link(
  router: router,
  batch_size: 10,
  name: MyDispatcher
)

stop(pid, timeout \\ 5000)

Stops the dispatcher gracefully.

Waits for the current batch to complete before shutting down.

Parameters

  • pid - The dispatcher process
  • timeout - Maximum time to wait for shutdown (default: 5000ms)

Examples

AsyncDispatcher.stop(dispatcher)
AsyncDispatcher.stop(dispatcher, 10_000)

wait_for_empty_queue(pid, opts \\ [])

Waits for the event queue to be empty.

This is useful for testing or ensuring all events have been processed before continuing.

Parameters

  • pid - The dispatcher process
  • opts - Keyword list with:
    • :timeout - Maximum wait time in milliseconds (default: 5000)

Returns

  • :ok - Queue is empty
  • {:error, :timeout} - Timeout reached with events still in queue

Examples

:ok = AsyncDispatcher.wait_for_empty_queue(dispatcher)

case AsyncDispatcher.wait_for_empty_queue(dispatcher, timeout: 10_000) do
  :ok -> IO.puts("All events processed")
  {:error, :timeout} -> IO.puts("Timed out waiting")
end