View Source Agens.Job (agens v0.2.0)

A Job defines a multi-agent workflow as a map of Agens.Job.Nodes.

A Job is a map of node_id => Agens.Job.Node plus a :starting_node_id. Each Node declares a Serving and, optionally, an objective, tools, resources, or a Sub-Job. Routing between Nodes is dynamic and can be driven by either code or the LM: the Serving's Router returns next instructions ({:route, node_id, count}, {:yield, node_id}, {:sub, job_id}, :end, :retry) based on the Node's structured outputs. There is no static next field on a Node — execution is defined entirely by the routing decisions emitted at runtime.

Events

Lifecycle and per-Node activity are surfaced through the Agens.Backend behaviour. Configured backends (see Agens.backends/0 for the defaults) receive callbacks for every significant event. The default emit backend forwards them to the caller process as messages, suitable for handle_info/2 in a UI/pubsub layer; the default log backend writes structured logs. Implement your own Agens.Backend for custom persistence or side effects.

The default emit backend sends:

Job

{:job_run, job_id, run_id}
{:job_status, {run_id, status}}
{:job_complete, run_id}
{:job_ended, run_id}
{:job_error, message, error}

Node

{:node_started, message}
{:node_retry, message}
{:node_result, message}

Tool / Resource / Prompt

The following are emitted when applicable (e.g. :tool_call only when the Node has :tools set, :resource_load only when :resources are configured):

{:tool_call, message, tool_call}
{:resource_load, message, resource}
{:prompt, {system, user}}

Yield

Emitted while a yielding Node waits on, or aggregates, parallel threads:

{:yield_wait, {message, total_count, ready_count}}
{:yield_done, {message, total_count}}

Routing and Sub-Jobs

Every Agens.Job.Node declares a :serving, even when it also declares :sub. The Node's Serving owns routing: the parent Node's next instructions are always produced by the Serving's router (Agens.Serving.handle_result/3 for normal inference, Agens.Serving.handle_sub/3 for a resolved Sub-Job). There is no static next field on Agens.Job.Node — all routing is dynamic.

Two distinct Sub flows are supported:

  • Sub Node (Agens.Job.Node with :sub set) — the Sub-Job runs in place of inference on the parent Node. When the Sub completes, the parent invokes Agens.Serving.handle_sub/3 on the Node's declared Serving to map the Sub's final Agens.Message into the parent Node's outputs and next.
  • Sub via routing instruction ({:sub, job_id} returned in a Serving's next) — the Sub-Job runs as additional work after the Node's inference has completed and routing was already decided. The Sub's terminal message drives subsequent routing in the parent; handle_sub/3 is not invoked.

Configuration is validated on start/2. Missing :serving on any Node raises ArgumentError immediately rather than failing at runtime.

Summary

Functions

Retrieves the Agens.Job.Config of a running Job by run_id or pid.

Runs a Job that was previously started with start/2.

Starts a new supervised Job process for the given Agens.Job.Config and run_id.

Stops a running Job by run_id.

Functions

@spec get_config(pid() | binary()) ::
  {:ok, Agens.Job.Config.t()} | {:error, :run_not_found}

Retrieves the Agens.Job.Config of a running Job by run_id or pid.

Returns {:error, :run_not_found} if no Job is running under the given run_id.

Link to this macro

is_status(status)

View Source (macro)
Link to this function

run(run_id, input, opts)

View Source
@spec run(pid() | binary(), String.t(), keyword()) ::
  :ok | {:error, :run_not_found | :job_already_running | :input_required}

Runs a Job that was previously started with start/2.

Identifies the Job by its run_id (or directly by pid) and begins execution at the Config's :starting_node_id with the given input. Returns immediately with :ok once the run is accepted; subsequent progress is surfaced through the configured Agens.Backends.

Options

  • :caller - The pid to attribute backend events to (used as the first argument of every Agens.Backend callback). Defaults to the calling process.
  • :sub - Internal use. An Agens.Job.Sub struct supplied by the runtime when this Job is being executed as a Sub-Job of a parent run.

Errors

  • {:error, :run_not_found} - No Job process is registered under the given run_id.
  • {:error, :job_already_running} - The Job has already started and not yet completed.
  • {:error, :input_required} - input was nil.
@spec start(Agens.Job.Config.t(), binary()) :: {:ok, pid()} | {:error, term()}

Starts a new supervised Job process for the given Agens.Job.Config and run_id.

Validates the config (raises ArgumentError if any Node is missing :serving) and registers the process in Agens.Registry under run_id. The same Config can be started multiple times in parallel by passing distinct run_ids — typically obtained from Agens.generate_uid/0.

start/2 only starts the supervised process; it does not run the Job. Call run/3 to begin execution.

@spec stop(binary()) :: :ok | {:error, :run_not_found}

Stops a running Job by run_id.

Returns :ok when the Job process is found and stopped, or {:error, :run_not_found} if no Job is running under the given run_id.