Squidie.Runtime.WorkflowAgent (squidie v0.1.2)

Copy Markdown View Source

Jido-native workflow coordination state for one durable workflow run.

The agent rebuilds from run-thread journal entries and checkpoints. It does not execute workflow steps; it provides the restartable coordination state needed by the journal-backed runtime.

Summary

Functions

Returns the list of actions from all attached plugins.

Returns the stable Jido agent id for a workflow run.

Returns runnable keys whose dispatch results have been applied to the run.

Applies every completed dispatch result that is still pending for the workflow run.

Records that a durable dispatch completion has been applied to the workflow run.

Returns the union of all capabilities from all mounted plugin instances.

Returns the agent's category.

Execute actions against the agent: (agent, action) -> {agent, directives}

Returns the agent's description.

Returns the agent's name.

Creates a new agent with optional initial state.

Lists planned runnables that still need dispatch scheduling.

Lists completed dispatch results that still need workflow application.

Returns runnable keys planned by the workflow thread.

Returns planned runnable payloads in deterministic order.

Returns the configuration for a specific plugin.

Returns the list of plugin instances attached to this agent.

Returns the expanded and validated plugin routes.

Returns the expanded plugin and agent schedules.

Returns the list of plugin specs attached to this agent.

Returns the state slice for a specific plugin.

Returns the list of plugin modules attached to this agent (deduplicated).

Stores the current workflow projection as a checkpoint for faster rebuilds.

Rebuilds a workflow agent for one run from its durable run thread.

Schedules every planned runnable that is missing from the dispatch journal.

Returns the merged schema (base + plugin schemas).

Updates the agent's state by merging new attributes.

Returns all expanded route signal types from plugin routes.

Returns the workflow projection status.

Returns the execution strategy module for this agent.

Returns the strategy options for this agent.

Returns a stable, public view of the strategy's execution state.

Returns the agent's tags.

Validates the agent's state against its schema.

Returns the agent's version.

Types

apply_many_update()

@type apply_many_update() :: %{
  agent: Jido.Agent.t(),
  attempts: [Squidie.Runtime.DispatchProtocol.ActionAttempt.t()]
}

apply_update()

@type apply_update() :: %{
  agent: Jido.Agent.t(),
  attempt: Squidie.Runtime.DispatchProtocol.ActionAttempt.t()
}

dispatch_schedule_update()

@type dispatch_schedule_update() :: %{agent: Jido.Agent.t(), runnables: [map()]}

run_id()

@type run_id() :: String.t()

storage_config()

@type storage_config() :: Squidie.Runtime.Journal.storage_config()

Functions

actions()

@spec actions() :: [module()]

Returns the list of actions from all attached plugins.

agent_id(run_id)

@spec agent_id(run_id()) :: String.t()

Returns the stable Jido agent id for a workflow run.

applied_runnable_keys(agent)

@spec applied_runnable_keys(Jido.Agent.t()) :: MapSet.t(String.t())

Returns runnable keys whose dispatch results have been applied to the run.

apply_pending_results(storage, workflow_agent, dispatch_agent, opts \\ [])

@spec apply_pending_results(
  storage_config(),
  Jido.Agent.t(),
  Jido.Agent.t(),
  keyword()
) ::
  {:ok, apply_many_update()} | {:error, term()}

Applies every completed dispatch result that is still pending for the workflow run.

This is the restart recovery boundary for lost live wakeups: both agents can be rebuilt from durable journals, pending completed attempts can be derived again, and each missing workflow application is appended to the run thread with the current workflow-agent revision as the append fence.

apply_result(storage, agent, attempt, opts \\ [])

@spec apply_result(
  storage_config(),
  Jido.Agent.t(),
  Squidie.Runtime.DispatchProtocol.ActionAttempt.t(),
  keyword()
) :: {:ok, apply_update()} | {:error, term()}

Records that a durable dispatch completion has been applied to the workflow run.

The dispatch attempt must be completed, belong to the workflow agent's run, and reference a planned runnable that has not already been applied. Stale workflow-agent callers race at the run-thread append boundary and receive {:error, :conflict} from the journal.

capabilities()

@spec capabilities() :: [atom()]

Returns the union of all capabilities from all mounted plugin instances.

Capabilities are atoms describing what the agent can do based on its mounted plugins.

Example

MyAgent.capabilities()
# => [:messaging, :channel_management, :chat, :embeddings]

category()

@spec category() :: String.t() | nil

Returns the agent's category.

cmd(agent, action)

Execute actions against the agent: (agent, action) -> {agent, directives}

This is the core operation. Actions modify state and may perform required work; directives are runtime-owned external effects. Execution is delegated to the configured strategy (default: Direct).

Action Formats

  • MyAction - Action module with no params
  • {MyAction, %{param: 1}} - Action with params
  • {MyAction, %{param: 1}, %{context: data}} - Action with params and context
  • {MyAction, %{param: 1}, %{}, [timeout: 1000]} - Action with opts
  • %Instruction{} - Full instruction struct
  • [...] - List of any of the above (processed in sequence)

Options

The optional third argument opts is a keyword list merged into all instructions:

  • :timeout - Maximum time (in ms) for each action to complete
  • :max_retries - Maximum retry attempts on failure
  • :backoff - Initial backoff time in ms (doubles with each retry)

Examples

{agent, directives} = Squidie.Runtime.WorkflowAgent.cmd(agent, MyAction)
{agent, directives} = Squidie.Runtime.WorkflowAgent.cmd(agent, {MyAction, %{value: 42}})
{agent, directives} = Squidie.Runtime.WorkflowAgent.cmd(agent, [Action1, Action2])

# With per-call options (merged into all instructions)
{agent, directives} = Squidie.Runtime.WorkflowAgent.cmd(agent, MyAction, timeout: 5000)

cmd(agent, action, opts)

description()

@spec description() :: String.t() | nil

Returns the agent's description.

name()

@spec name() :: String.t()

Returns the agent's name.

new(opts \\ [])

@spec new(keyword() | map()) :: Jido.Agent.t()

Creates a new agent with optional initial state.

The agent is fully initialized including strategy state. For the default Direct strategy, this is a no-op. For custom strategies, any state initialization is applied (but directives are only processed by AgentServer).

Examples

agent = Squidie.Runtime.WorkflowAgent.new()
agent = Squidie.Runtime.WorkflowAgent.new(id: "custom-id")
agent = Squidie.Runtime.WorkflowAgent.new(state: %{counter: 10})

pending_dispatches(agent, dispatch_agent)

@spec pending_dispatches(Jido.Agent.t(), Jido.Agent.t()) :: [map()]

Lists planned runnables that still need dispatch scheduling.

pending_results(agent, dispatch_agent)

Lists completed dispatch results that still need workflow application.

planned_runnable_keys(agent)

@spec planned_runnable_keys(Jido.Agent.t()) :: [String.t()]

Returns runnable keys planned by the workflow thread.

planned_runnables(agent)

@spec planned_runnables(Jido.Agent.t()) :: [map()]

Returns planned runnable payloads in deterministic order.

plugin_config(plugin_mod)

@spec plugin_config(module() | {module(), atom()}) :: map() | nil

Returns the configuration for a specific plugin.

Accepts either a module or a {module, as_alias} tuple for multi-instance plugins.

plugin_instances()

@spec plugin_instances() :: [Jido.Plugin.Instance.t()]

Returns the list of plugin instances attached to this agent.

plugin_routes()

@spec plugin_routes() :: [{String.t(), module(), integer()}]

Returns the expanded and validated plugin routes.

plugin_schedules()

Returns the expanded plugin and agent schedules.

plugin_specs()

@spec plugin_specs() :: [Jido.Plugin.Spec.t()]

Returns the list of plugin specs attached to this agent.

plugin_state(agent, plugin_mod)

@spec plugin_state(Jido.Agent.t(), module() | {module(), atom()}) :: map() | nil

Returns the state slice for a specific plugin.

Accepts either a module or a {module, as_alias} tuple for multi-instance plugins.

plugins()

@spec plugins() :: [module()]

Returns the list of plugin modules attached to this agent (deduplicated).

For multi-instance plugins, the module appears once regardless of how many instances are mounted.

Example

MyAgent.plugins()
# => [MyApp.SlackPlugin, MyApp.OpenAIPlugin]

put_checkpoint(storage, agent, opts \\ [])

@spec put_checkpoint(storage_config(), Jido.Agent.t(), keyword()) ::
  :ok | {:error, term()}

Stores the current workflow projection as a checkpoint for faster rebuilds.

rebuild(storage, run_id)

@spec rebuild(storage_config(), run_id()) :: {:ok, Jido.Agent.t()} | {:error, term()}

Rebuilds a workflow agent for one run from its durable run thread.

schedule_pending_dispatches(storage, workflow_agent, dispatch_agent, opts \\ [])

@spec schedule_pending_dispatches(
  storage_config(),
  Jido.Agent.t(),
  Jido.Agent.t(),
  keyword()
) ::
  {:ok, dispatch_schedule_update()} | {:error, term()}

Schedules every planned runnable that is missing from the dispatch journal.

This is the restart recovery boundary for the crash window between durable workflow planning and durable dispatch scheduling. The workflow agent derives missing planned runnables from the run-thread projection, and the dispatch agent appends their :attempt_scheduled entries with its current dispatch thread revision as the append fence.

schema()

@spec schema() :: Zoi.schema() | keyword()

Returns the merged schema (base + plugin schemas).

set(agent, attrs)

@spec set(Jido.Agent.t(), map() | keyword()) :: Jido.Agent.agent_result()

Updates the agent's state by merging new attributes.

Uses deep merge semantics - nested maps are merged recursively.

Examples

{:ok, agent} = Squidie.Runtime.WorkflowAgent.set(agent, %{status: :running})
{:ok, agent} = Squidie.Runtime.WorkflowAgent.set(agent, counter: 5)

signal_types()

@spec signal_types() :: [String.t()]

Returns all expanded route signal types from plugin routes.

These are the fully-prefixed signal types that the agent can handle.

Example

MyAgent.signal_types()
# => ["slack.post", "slack.channels.list", "openai.chat"]

status(agent)

@spec status(Jido.Agent.t()) :: atom()

Returns the workflow projection status.

strategy()

@spec strategy() :: module()

Returns the execution strategy module for this agent.

strategy_opts()

@spec strategy_opts() :: keyword()

Returns the strategy options for this agent.

strategy_snapshot(agent)

@spec strategy_snapshot(Jido.Agent.t()) :: Jido.Agent.Strategy.Snapshot.t()

Returns a stable, public view of the strategy's execution state.

Use this instead of inspecting agent.state.__strategy__ directly. Returns a Jido.Agent.Strategy.Snapshot struct with:

  • status - Coarse execution status
  • done? - Whether strategy reached terminal state
  • result - Main output if any
  • details - Additional strategy-specific metadata

tags()

@spec tags() :: [String.t()]

Returns the agent's tags.

validate(agent, opts \\ [])

@spec validate(
  Jido.Agent.t(),
  keyword()
) :: Jido.Agent.agent_result()

Validates the agent's state against its schema.

Options

  • :strict - When true, only schema-defined fields are kept (default: false)

Examples

{:ok, agent} = Squidie.Runtime.WorkflowAgent.validate(agent)
{:ok, agent} = Squidie.Runtime.WorkflowAgent.validate(agent, strict: true)

vsn()

@spec vsn() :: String.t() | nil

Returns the agent's version.