After: You can test pure logic and integration without flakes.
Testing Jido agents involves two approaches: pure agent testing (no runtime) and integration testing with AgentServer. This guide covers both patterns along with test isolation, async coordination, and mocking strategies.
Isolated Runtime Tests
For tests that start AgentServer processes, start an isolated Jido instance
per test. Each test receives its own Registry, TaskSupervisor, and
AgentSupervisor, which prevents cross-test interference when running async.
Jido's own test suite has repo-local helpers under the JidoTest.* namespace,
but those modules live in test/support and are not part of the published
package API. In your application tests, use plain ExUnit.Case and start a
unique Jido instance in setup.
defmodule MyAgentTest do
use ExUnit.Case, async: true
setup do
jido = :"jido_test_#{System.unique_integer([:positive])}"
{:ok, jido_pid} = start_supervised({Jido, name: jido})
{:ok, jido: jido, jido_pid: jido_pid}
end
test "starts agent under isolated instance", %{jido: jido} do
{:ok, pid} = Jido.start_agent(jido, MyAgent)
assert Process.alive?(pid)
end
endThe test context includes:
| Key | Description |
|---|---|
:jido | Name of the Jido instance (atom) |
:jido_pid | PID of the Jido supervisor |
Use the :jido context value when starting agents directly, and derive
instance-specific infrastructure names from it when needed:
test "uses isolated infrastructure", %{jido: jido} do
{:ok, pid} = Jido.start_agent(jido, MyAgent, id: "test-1")
registry = Jido.registry_name(jido)
task_sup = Jido.task_supervisor_name(jido)
agent_sup = Jido.agent_supervisor_name(jido)
assert Jido.AgentServer.whereis(registry, "test-1") == pid
assert Process.whereis(task_sup)
assert Process.whereis(agent_sup)
endFor async behavior, prefer await functions or polling over fixed sleeps. When you need custom polling, keep it local to the test module:
defmodule MyAsyncAgentTest do
use ExUnit.Case, async: true
setup do
jido = :"jido_test_#{System.unique_integer([:positive])}"
{:ok, jido_pid} = start_supervised({Jido, name: jido})
{:ok, jido: jido, jido_pid: jido_pid}
end
defp do_eventually(fun, deadline, interval) do
case fun.() do
result when result not in [nil, false] ->
result
_ ->
if System.monotonic_time(:millisecond) > deadline do
raise ExUnit.AssertionError, message: "condition not met within timeout"
end
Process.sleep(interval)
do_eventually(fun, deadline, interval)
end
end
defp eventually(fun, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 500)
interval = Keyword.get(opts, :interval, 5)
deadline = System.monotonic_time(:millisecond) + timeout
do_eventually(fun, deadline, interval)
end
defp eventually_state(pid, fun, opts \\ []) do
eventually(fn ->
case Jido.AgentServer.state(pid) do
{:ok, state} -> if fun.(state), do: state, else: false
_ -> false
end
end, opts)
end
endTesting Pure Agents
Agents are immutable structs. Test state transformations without any runtime:
defmodule CounterAgentTest do
use ExUnit.Case, async: true
alias MyApp.CounterAgent
alias MyApp.Actions.{Increment, Decrement}
describe "cmd/2 state transformations" do
test "increment updates counter" do
agent = CounterAgent.new()
assert agent.state.counter == 0
{agent, directives} = CounterAgent.cmd(agent, {Increment, %{by: 5}})
assert agent.state.counter == 5
assert directives == []
end
test "decrement reduces counter" do
agent = CounterAgent.new(state: %{counter: 10})
{agent, _directives} = CounterAgent.cmd(agent, Decrement)
assert agent.state.counter == 9
end
test "multiple actions in sequence" do
agent = CounterAgent.new()
{agent, _} = CounterAgent.cmd(agent, [
{Increment, %{by: 10}},
{Decrement, %{}},
{Increment, %{by: 5}}
])
assert agent.state.counter == 14
end
end
describe "directives" do
test "action can emit signal directive" do
agent = CounterAgent.new()
{agent, directives} = CounterAgent.cmd(agent, NotifyAction)
assert [%Jido.Agent.Directive.Emit{signal: signal}] = directives
assert signal.type == "counter.updated"
end
end
endTesting Validation
test "validate/2 enforces schema" do
agent = MyAgent.new(state: %{status: :running, extra: "data"})
# Non-strict preserves extra fields
{:ok, validated} = MyAgent.validate(agent)
assert validated.state.extra == "data"
# Strict mode removes extra fields
{:ok, strict} = MyAgent.validate(agent, strict: true)
refute Map.has_key?(strict.state, :extra)
endTesting set/2
test "set/2 deep merges state" do
agent = MyAgent.new(state: %{config: %{a: 1, b: 2}})
{:ok, updated} = MyAgent.set(agent, %{config: %{b: 3, c: 4}})
assert updated.state.config == %{a: 1, b: 3, c: 4}
endTesting with AgentServer
For integration tests involving signals, directives, and real process behavior:
defmodule AgentIntegrationTest do
use ExUnit.Case, async: true
alias Jido.{AgentServer, Signal}
setup do
jido = :"jido_test_#{System.unique_integer([:positive])}"
{:ok, jido_pid} = start_supervised({Jido, name: jido})
{:ok, jido: jido, jido_pid: jido_pid}
end
describe "signal processing" do
test "synchronous call returns updated agent", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(agent: CounterAgent, jido: jido)
signal = Signal.new!("increment", %{by: 5}, source: "/test")
{:ok, agent} = AgentServer.call(pid, signal)
assert agent.state.counter == 5
end
test "async cast processes in background", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(agent: CounterAgent, jido: jido)
signal = Signal.new!("increment", %{}, source: "/test")
assert :ok = AgentServer.cast(pid, signal)
# Wait briefly for async processing
Process.sleep(10)
{:ok, state} = AgentServer.state(pid)
assert state.agent.state.counter == 1
end
test "multiple signals in sequence", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(agent: CounterAgent, jido: jido)
for _ <- 1..5 do
signal = Signal.new!("increment", %{}, source: "/test")
{:ok, _} = AgentServer.call(pid, signal)
end
{:ok, state} = AgentServer.state(pid)
assert state.agent.state.counter == 5
end
end
describe "initial state" do
test "starts with custom initial state", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(
agent: CounterAgent,
initial_state: %{counter: 100},
jido: jido
)
{:ok, state} = AgentServer.state(pid)
assert state.agent.state.counter == 100
end
test "starts with pre-built agent", %{jido: jido} do
agent = CounterAgent.new(id: "prebuilt-123")
agent = %{agent | state: Map.put(agent.state, :counter, 50)}
{:ok, pid} = AgentServer.start_link(
agent: agent,
agent_module: CounterAgent,
jido: jido
)
{:ok, state} = AgentServer.state(pid)
assert state.id == "prebuilt-123"
assert state.agent.state.counter == 50
end
end
endTesting Registry Lookup
test "agent registers with ID", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(
agent: MyAgent,
id: "my-agent-1",
jido: jido
)
registry = Jido.registry_name(jido)
assert AgentServer.whereis(registry, "my-agent-1") == pid
endAwait Patterns in Tests
Use Jido.await/2 and related functions for coordination:
Waiting for Completion
test "await waits for agent completion", %{jido: jido} do
{:ok, pid} = Jido.start_agent(jido, WorkerAgent)
# Trigger async work
signal = Signal.new!("start_work", %{}, source: "/test")
AgentServer.cast(pid, signal)
# Wait for completion (agent sets status: :completed)
{:ok, result} = Jido.await(pid, 10_000)
assert result.status == :completed
assert result.result == "done"
endWaiting for Child Agents
test "await_child waits for spawned child", %{jido: jido} do
{:ok, parent} = Jido.start_agent(jido, CoordinatorAgent)
# Parent spawns a child via SpawnAgent directive
signal = Signal.new!("spawn_worker", %{tag: :worker_1}, source: "/test")
{:ok, _} = AgentServer.call(parent, signal)
# Wait for child to complete
{:ok, result} = Jido.await_child(parent, :worker_1, 30_000)
assert result.status == :completed
endWaiting for Multiple Agents
test "await_all waits for all agents", %{jido: jido} do
pids = for i <- 1..3 do
{:ok, pid} = Jido.start_agent(jido, WorkerAgent, id: "worker-#{i}")
AgentServer.cast(pid, Signal.new!("start", %{}, source: "/test"))
pid
end
{:ok, results} = Jido.await_all(pids, 30_000)
assert map_size(results) == 3
Enum.each(results, fn {_pid, result} ->
assert result.status == :completed
end)
end
test "await_any returns first to complete", %{jido: jido} do
pids = for i <- 1..3 do
{:ok, pid} = Jido.start_agent(jido, WorkerAgent, id: "racer-#{i}")
AgentServer.cast(pid, Signal.new!("start", %{delay: i * 100}, source: "/test"))
pid
end
{:ok, {winner, result}} = Jido.await_any(pids, 10_000)
assert winner in pids
assert result.status == :completed
endTimeout Handling
test "await returns timeout error", %{jido: jido} do
{:ok, pid} = Jido.start_agent(jido, SlowAgent)
AgentServer.cast(pid, Signal.new!("slow_work", %{}, source: "/test"))
assert {:error, :timeout} = Jido.await(pid, 100)
endMocking with Mimic
Use Mimic for mocking external dependencies:
Setup
# test/test_helper.exs
Mimic.copy(MyApp.ExternalService)
Mimic.copy(MyApp.HttpClient)
ExUnit.start()Basic Mocking
defmodule ExternalServiceTest do
use ExUnit.Case, async: true
use Mimic
setup do
jido = :"jido_test_#{System.unique_integer([:positive])}"
{:ok, jido_pid} = start_supervised({Jido, name: jido})
{:ok, jido: jido, jido_pid: jido_pid}
end
test "mocks external service call", %{jido: jido} do
expect(MyApp.ExternalService, :call, fn args ->
assert args == %{query: "test"}
{:ok, "mocked response"}
end)
{:ok, pid} = Jido.start_agent(jido, MyAgent)
signal = Signal.new!("fetch_data", %{query: "test"}, source: "/test")
{:ok, agent} = AgentServer.call(pid, signal)
assert agent.state.result == "mocked response"
end
endStubbing
test "stubs return consistent value", %{jido: jido} do
stub(MyApp.HttpClient, :get, fn _url ->
{:ok, %{status: 200, body: "stubbed"}}
end)
{:ok, pid} = Jido.start_agent(jido, FetcherAgent)
# Multiple calls all return stubbed value
for _ <- 1..3 do
signal = Signal.new!("fetch", %{}, source: "/test")
{:ok, agent} = AgentServer.call(pid, signal)
assert agent.state.last_response == "stubbed"
end
endVerifying Call Count
test "verifies service was called", %{jido: jido} do
expect(MyApp.ExternalService, :call, 2, fn _args ->
{:ok, "result"}
end)
{:ok, pid} = Jido.start_agent(jido, MyAgent)
signal = Signal.new!("process", %{}, source: "/test")
{:ok, _} = AgentServer.call(pid, signal)
{:ok, _} = AgentServer.call(pid, signal)
# Mimic automatically verifies expect count at test end
endRejecting Calls
test "service should not be called", %{jido: jido} do
reject(&MyApp.ExternalService.call/1)
{:ok, pid} = Jido.start_agent(jido, CachedAgent)
# Agent uses cache, should not call external service
signal = Signal.new!("get_cached", %{}, source: "/test")
{:ok, _} = AgentServer.call(pid, signal)
endTesting Parent-Child Hierarchies
defmodule HierarchyTest do
use ExUnit.Case, async: true
alias Jido.{AgentServer, Signal}
alias Jido.AgentServer.ParentRef
setup do
jido = :"jido_test_#{System.unique_integer([:positive])}"
{:ok, jido_pid} = start_supervised({Jido, name: jido})
{:ok, jido: jido, jido_pid: jido_pid}
end
test "child receives parent reference", %{jido: jido} do
{:ok, parent_pid} = AgentServer.start_link(
agent: ParentAgent,
id: "parent-1",
jido: jido
)
parent_ref = ParentRef.new!(%{
pid: parent_pid,
id: "parent-1",
tag: :worker
})
{:ok, child_pid} = AgentServer.start_link(
agent: ChildAgent,
id: "child-1",
parent: parent_ref,
jido: jido
)
{:ok, child_state} = AgentServer.state(child_pid)
assert child_state.parent.pid == parent_pid
assert child_state.parent.id == "parent-1"
end
test "parent receives child exit notification", %{jido: jido} do
{:ok, parent_pid} = AgentServer.start(
agent: ParentAgent,
id: "parent-1",
jido: jido
)
# Parent spawns child via directive
signal = Signal.new!(
"spawn_agent",
%{module: ChildAgent, tag: :worker_1},
source: "/test"
)
{:ok, _} = AgentServer.call(parent_pid, signal)
# Wait for child to appear
Process.sleep(50)
{:ok, state} = AgentServer.state(parent_pid)
child_info = state.children[:worker_1]
# Terminate child
child_ref = Process.monitor(child_info.pid)
DynamicSupervisor.terminate_child(
Jido.agent_supervisor_name(jido),
child_info.pid
)
assert_receive {:DOWN, ^child_ref, :process, _, :shutdown}, 500
# Parent should process child exit
Process.sleep(50)
{:ok, final_state} = AgentServer.state(parent_pid)
refute Map.has_key?(final_state.children, :worker_1)
assert length(final_state.agent.state.child_events) == 1
end
endTesting Orphaning and Adoption
When you use on_parent_death: :continue or :emit_orphan, test the orphan
transition explicitly instead of only asserting that the child survived.
test "child becomes orphaned and can be adopted", %{jido: jido} do
{:ok, parent_pid} = Jido.start_agent(jido, ParentAgent, id: "parent-1")
{:ok, _} =
AgentServer.call(
parent_pid,
Signal.new!(
"spawn_agent",
%{
module: ChildAgent,
tag: :worker,
opts: %{id: "worker-1", on_parent_death: :emit_orphan}
},
source: "/test"
)
)
eventually(fn ->
{:ok, children} = Jido.get_children(parent_pid)
Map.has_key?(children, :worker)
end)
{:ok, children} = Jido.get_children(parent_pid)
child_pid = children.worker
DynamicSupervisor.terminate_child(Jido.agent_supervisor_name(jido), parent_pid)
eventually_state(child_pid, fn state ->
state.parent == nil and
Map.get(state.agent.state, :__parent__) == nil and
state.orphaned_from.id == "parent-1"
end)
{:ok, replacement_pid} = Jido.start_agent(jido, ParentAgent, id: "parent-2")
{:ok, _} =
AgentServer.call(
replacement_pid,
Signal.new!(
"adopt_child",
%{child: "worker-1", tag: :worker},
source: "/test"
)
)
eventually(fn ->
{:ok, children} = Jido.get_children(replacement_pid)
Map.get(children, :worker) == child_pid
end)
endGood orphan lifecycle tests should verify:
state.parentandagent.state.__parent__are clearedstate.orphaned_fromandagent.state.__orphaned_from__are populatedDirective.emit_to_parent/3returnsnilwhile orphanedjido.agent.orphanedhandlers see detached state, not stale parent routingDirective.adopt_child/3restoresJido.get_children/1and child-to-parent messaging- an adopted child restart still binds to the adopted parent, not stale startup metadata
For a full acceptance test that reads like user documentation, see
test/examples/runtime/orphan_lifecycle_test.exs.
Testing Directive Execution
test "Schedule directive fires after delay", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(agent: SchedulerAgent, jido: jido)
signal = Signal.new!("schedule_ping", %{}, source: "/test")
{:ok, _} = AgentServer.call(pid, signal)
# Wait for scheduled message
Process.sleep(100)
{:ok, state} = AgentServer.state(pid)
assert state.agent.state.received_ping == true
end
test "Stop directive terminates agent", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(agent: MyAgent, jido: jido)
ref = Process.monitor(pid)
signal = Signal.new!("shutdown", %{}, source: "/test")
{:ok, _} = AgentServer.call(pid, signal)
assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1000
endCommon Patterns
Capturing Logs
import ExUnit.CaptureLog
test "logs on termination", %{jido: jido} do
{:ok, pid} = AgentServer.start_link(
agent: MyAgent,
id: "log-test",
jido: jido
)
log = capture_log(fn ->
GenServer.stop(pid, :normal)
Process.sleep(10)
end)
assert log =~ "log-test"
assert log =~ "terminating"
endTesting Error Handling
test "returns error directive for invalid action", %{jido: _jido} do
agent = MyAgent.new()
{_agent, directives} = MyAgent.cmd(agent, {InvalidAction, %{}})
assert [%Jido.Agent.Directive.Error{context: :instruction}] = directives
endTrapping Exits
test "child stops when parent dies", %{jido: jido} do
Process.flag(:trap_exit, true)
{:ok, parent_pid} = AgentServer.start(agent: ParentAgent, jido: jido)
parent_ref = ParentRef.new!(%{pid: parent_pid, id: "parent", tag: :worker})
{:ok, child_pid} = AgentServer.start(
agent: ChildAgent,
parent: parent_ref,
on_parent_death: :stop,
jido: jido
)
child_ref = Process.monitor(child_pid)
DynamicSupervisor.terminate_child(
Jido.agent_supervisor_name(jido),
parent_pid
)
assert_receive {:DOWN, ^child_ref, :process, ^child_pid, _}, 1000
endDebugging Failing Tests
Use instance-level debug to capture events when diagnosing test failures:
test "diagnose agent behavior", %{jido: jido} do
Jido.Debug.enable(jido, :on)
{:ok, pid} = Jido.start_agent(jido, MyAgent)
signal = Signal.new!("process", %{}, source: "/test")
AgentServer.cast(pid, signal)
Process.sleep(50)
{:ok, events} = Jido.AgentServer.recent_events(pid)
IO.inspect(events, label: "debug events")
endDebug overrides use :persistent_term and will leak between tests if not reset. Always reset in setup or on_exit when using Jido.Debug.enable/3:
setup %{jido: jido} = context do
Jido.Debug.reset(jido)
on_exit(fn -> Jido.Debug.reset(jido) end)
context
endSummary
| Scenario | Approach |
|---|---|
| State transformations | Pure cmd/2 testing, no runtime |
| Signal processing | Isolated Jido instance + AgentServer.call/cast |
| Async coordination | Jido.await/2, Jido.await_child/4 |
| External dependencies | Mimic expect/stub/reject |
| Test isolation | Unique Jido instance per test |
Further Reading
Jido.Await— Coordination API detailsJido.AgentServer— Server API reference