Squidie.Runtime.DispatchAgent (squidie v0.1.2)

Copy Markdown View Source

Jido-native dispatch coordination state for one durable dispatch queue.

The agent rebuilds from dispatch-thread journal entries and performs durable claim appends so the runtime can coordinate leases, retries, and workflow wakeups from durable facts instead of in-memory state.

Summary

Functions

Returns the list of actions from all attached plugins.

Returns the stable Jido agent id for a dispatch queue.

Returns the union of all capabilities from all mounted plugin instances.

Returns the agent's category.

Claims the next visible or expired attempt for a dispatch queue agent.

Execute actions against the agent: (agent, action) -> {agent, directives}

Records a durable successful result for a currently claimed attempt.

Lists completed dispatch attempts waiting for workflow application.

Returns the agent's description.

Records that a run belongs to this dispatch queue before runnable attempts are scheduled.

Lists claimed attempts whose leases have expired by the given time.

Records a durable failure for a currently claimed attempt.

Extends the lease for a currently claimed attempt.

Returns the agent's name.

Creates a new agent with optional initial state.

Returns the configuration for a specific plugin.

Returns the list of plugin instances attached to this agent.

Returns the expanded and validated plugin routes.

Returns the expanded plugin and agent schedules.

Returns the list of plugin specs attached to this agent.

Returns the state slice for a specific plugin.

Returns the list of plugin modules attached to this agent (deduplicated).

Stores the current dispatch projection as a checkpoint for faster rebuilds.

Rebuilds a dispatch agent for one queue from the durable dispatch thread.

Returns every run id known by the dispatch projection.

Returns every runnable key already known by the dispatch projection.

Appends durable scheduled attempts for planned runnables that are not already present in the dispatch-agent projection.

Returns the merged schema (base + plugin schemas).

Updates the agent's state by merging new attributes.

Returns all expanded route signal types from plugin routes.

Returns the execution strategy module for this agent.

Returns the strategy options for this agent.

Returns a stable, public view of the strategy's execution state.

Returns the agent's tags.

Validates the agent's state against its schema.

Lists attempts whose visibility window has opened and can be claimed.

Returns the agent's version.

Types

claim()

@type claim() :: %{
  agent: Jido.Agent.t(),
  attempt: Squidie.Runtime.DispatchProtocol.ActionAttempt.t(),
  claim_id: String.t(),
  claim_token: String.t(),
  lease_until: DateTime.t()
}

lifecycle_update()

@type lifecycle_update() :: %{
  :agent => Jido.Agent.t(),
  :attempt => Squidie.Runtime.DispatchProtocol.ActionAttempt.t(),
  optional(:lease_until) => DateTime.t()
}

queue()

@type queue() :: String.t()

queue_update()

@type queue_update() :: %{agent: Jido.Agent.t(), queued?: boolean()}

schedule_update()

@type schedule_update() :: %{agent: Jido.Agent.t(), runnables: [map()]}

storage_config()

@type storage_config() :: Squidie.Runtime.Journal.storage_config()

Functions

actions()

@spec actions() :: [module()]

Returns the list of actions from all attached plugins.

agent_id(queue)

@spec agent_id(queue() | atom()) :: String.t()

Returns the stable Jido agent id for a dispatch queue.

capabilities()

@spec capabilities() :: [atom()]

Returns the union of all capabilities from all mounted plugin instances.

Capabilities are atoms describing what the agent can do based on its mounted plugins.

Example

MyAgent.capabilities()
# => [:messaging, :channel_management, :chat, :embeddings]

category()

@spec category() :: String.t() | nil

Returns the agent's category.

claim_next(storage, agent, owner_id, opts \\ [])

@spec claim_next(storage_config(), Jido.Agent.t(), String.t(), keyword()) ::
  {:ok, claim()} | {:ok, :none} | {:error, term()}

Claims the next visible or expired attempt for a dispatch queue agent.

The claim is persisted as an :attempt_claimed journal entry with the agent's current dispatch-thread revision as :expected_rev. Concurrent claimers therefore race at the journal boundary and receive {:error, :conflict} when their projection is stale.

The returned claim contains the raw claim_token for the worker process, but the durable journal stores only its hash. If the append succeeds, the returned :attempt reflects the post-claim projection state.

cmd(agent, action)

Execute actions against the agent: (agent, action) -> {agent, directives}

This is the core operation. Actions modify state and may perform required work; directives are runtime-owned external effects. Execution is delegated to the configured strategy (default: Direct).

Action Formats

  • MyAction - Action module with no params
  • {MyAction, %{param: 1}} - Action with params
  • {MyAction, %{param: 1}, %{context: data}} - Action with params and context
  • {MyAction, %{param: 1}, %{}, [timeout: 1000]} - Action with opts
  • %Instruction{} - Full instruction struct
  • [...] - List of any of the above (processed in sequence)

Options

The optional third argument opts is a keyword list merged into all instructions:

  • :timeout - Maximum time (in ms) for each action to complete
  • :max_retries - Maximum retry attempts on failure
  • :backoff - Initial backoff time in ms (doubles with each retry)

Examples

{agent, directives} = Squidie.Runtime.DispatchAgent.cmd(agent, MyAction)
{agent, directives} = Squidie.Runtime.DispatchAgent.cmd(agent, {MyAction, %{value: 42}})
{agent, directives} = Squidie.Runtime.DispatchAgent.cmd(agent, [Action1, Action2])

# With per-call options (merged into all instructions)
{agent, directives} = Squidie.Runtime.DispatchAgent.cmd(agent, MyAction, timeout: 5000)

cmd(agent, action, opts)

complete(storage, agent, runnable_key, claim_id, claim_token, result, opts \\ [])

@spec complete(
  storage_config(),
  Jido.Agent.t(),
  String.t(),
  String.t(),
  String.t(),
  map(),
  keyword()
) :: {:ok, lifecycle_update()} | {:error, term()}

Records a durable successful result for a currently claimed attempt.

completed_results(agent)

Lists completed dispatch attempts waiting for workflow application.

description()

@spec description() :: String.t() | nil

Returns the agent's description.

ensure_run_queued(storage, agent, run_id, opts \\ [])

@spec ensure_run_queued(storage_config(), Jido.Agent.t(), String.t(), keyword()) ::
  {:ok, queue_update()} | {:error, term()}

Records that a run belongs to this dispatch queue before runnable attempts are scheduled.

This queue marker lets recovery discover a started run even if the process crashes after the run thread is committed and before the first :attempt_scheduled entry is written.

expired_claims(agent, at)

Lists claimed attempts whose leases have expired by the given time.

fail(storage, agent, runnable_key, claim_id, claim_token, error, opts \\ [])

@spec fail(
  storage_config(),
  Jido.Agent.t(),
  String.t(),
  String.t(),
  String.t(),
  map(),
  keyword()
) :: {:ok, lifecycle_update()} | {:error, term()}

Records a durable failure for a currently claimed attempt.

:retry_runnable_key and :retry_visible_at may be provided together to make a retry attempt visible through the dispatch projection after the given time.

heartbeat(storage, agent, runnable_key, claim_id, claim_token, opts \\ [])

@spec heartbeat(
  storage_config(),
  Jido.Agent.t(),
  String.t(),
  String.t(),
  String.t(),
  keyword()
) ::
  {:ok, lifecycle_update()} | {:error, term()}

Extends the lease for a currently claimed attempt.

The heartbeat is rejected before writing when the claim token is stale, the claim has expired, or the dispatch-agent projection is not currently claimed.

name()

@spec name() :: String.t()

Returns the agent's name.

new(opts \\ [])

@spec new(keyword() | map()) :: Jido.Agent.t()

Creates a new agent with optional initial state.

The agent is fully initialized including strategy state. For the default Direct strategy, this is a no-op. For custom strategies, any state initialization is applied (but directives are only processed by AgentServer).

Examples

agent = Squidie.Runtime.DispatchAgent.new()
agent = Squidie.Runtime.DispatchAgent.new(id: "custom-id")
agent = Squidie.Runtime.DispatchAgent.new(state: %{counter: 10})

plugin_config(plugin_mod)

@spec plugin_config(module() | {module(), atom()}) :: map() | nil

Returns the configuration for a specific plugin.

Accepts either a module or a {module, as_alias} tuple for multi-instance plugins.

plugin_instances()

@spec plugin_instances() :: [Jido.Plugin.Instance.t()]

Returns the list of plugin instances attached to this agent.

plugin_routes()

@spec plugin_routes() :: [{String.t(), module(), integer()}]

Returns the expanded and validated plugin routes.

plugin_schedules()

Returns the expanded plugin and agent schedules.

plugin_specs()

@spec plugin_specs() :: [Jido.Plugin.Spec.t()]

Returns the list of plugin specs attached to this agent.

plugin_state(agent, plugin_mod)

@spec plugin_state(Jido.Agent.t(), module() | {module(), atom()}) :: map() | nil

Returns the state slice for a specific plugin.

Accepts either a module or a {module, as_alias} tuple for multi-instance plugins.

plugins()

@spec plugins() :: [module()]

Returns the list of plugin modules attached to this agent (deduplicated).

For multi-instance plugins, the module appears once regardless of how many instances are mounted.

Example

MyAgent.plugins()
# => [MyApp.SlackPlugin, MyApp.OpenAIPlugin]

put_checkpoint(storage, agent, opts \\ [])

@spec put_checkpoint(storage_config(), Jido.Agent.t(), keyword()) ::
  :ok | {:error, term()}

Stores the current dispatch projection as a checkpoint for faster rebuilds.

rebuild(storage, queue)

@spec rebuild(storage_config(), queue() | atom()) ::
  {:ok, Jido.Agent.t()} | {:error, term()}

Rebuilds a dispatch agent for one queue from the durable dispatch thread.

run_ids(agent)

@spec run_ids(Jido.Agent.t()) :: MapSet.t(String.t())

Returns every run id known by the dispatch projection.

runnable_keys(agent)

@spec runnable_keys(Jido.Agent.t()) :: MapSet.t(String.t())

Returns every runnable key already known by the dispatch projection.

schedule_attempts(storage, agent, run_id, runnables, opts \\ [])

@spec schedule_attempts(
  storage_config(),
  Jido.Agent.t(),
  String.t(),
  [map()],
  keyword()
) ::
  {:ok, schedule_update()} | {:error, term()}

Appends durable scheduled attempts for planned runnables that are not already present in the dispatch-agent projection.

The append uses the dispatch thread's current revision as the optimistic fence. Duplicate callers with stale dispatch projections therefore fail at the journal boundary, while callers that already see the scheduled attempts return idempotently without writing.

schema()

@spec schema() :: Zoi.schema() | keyword()

Returns the merged schema (base + plugin schemas).

set(agent, attrs)

@spec set(Jido.Agent.t(), map() | keyword()) :: Jido.Agent.agent_result()

Updates the agent's state by merging new attributes.

Uses deep merge semantics - nested maps are merged recursively.

Examples

{:ok, agent} = Squidie.Runtime.DispatchAgent.set(agent, %{status: :running})
{:ok, agent} = Squidie.Runtime.DispatchAgent.set(agent, counter: 5)

signal_types()

@spec signal_types() :: [String.t()]

Returns all expanded route signal types from plugin routes.

These are the fully-prefixed signal types that the agent can handle.

Example

MyAgent.signal_types()
# => ["slack.post", "slack.channels.list", "openai.chat"]

strategy()

@spec strategy() :: module()

Returns the execution strategy module for this agent.

strategy_opts()

@spec strategy_opts() :: keyword()

Returns the strategy options for this agent.

strategy_snapshot(agent)

@spec strategy_snapshot(Jido.Agent.t()) :: Jido.Agent.Strategy.Snapshot.t()

Returns a stable, public view of the strategy's execution state.

Use this instead of inspecting agent.state.__strategy__ directly. Returns a Jido.Agent.Strategy.Snapshot struct with:

  • status - Coarse execution status
  • done? - Whether strategy reached terminal state
  • result - Main output if any
  • details - Additional strategy-specific metadata

tags()

@spec tags() :: [String.t()]

Returns the agent's tags.

validate(agent, opts \\ [])

@spec validate(
  Jido.Agent.t(),
  keyword()
) :: Jido.Agent.agent_result()

Validates the agent's state against its schema.

Options

  • :strict - When true, only schema-defined fields are kept (default: false)

Examples

{:ok, agent} = Squidie.Runtime.DispatchAgent.validate(agent)
{:ok, agent} = Squidie.Runtime.DispatchAgent.validate(agent, strict: true)

visible_attempts(agent, at)

Lists attempts whose visibility window has opened and can be claimed.

vsn()

@spec vsn() :: String.t() | nil

Returns the agent's version.