View Source Agens.Job (agens v0.2.0)
A Job defines a multi-agent workflow as a map of Agens.Job.Nodes.
A Job is a map of node_id => Agens.Job.Node plus a :starting_node_id. Each Node declares a
Serving and, optionally, an objective, tools, resources, or a Sub-Job. Routing between Nodes is
dynamic and can be driven by either code or the LM: the Serving's Router returns next instructions
({:route, node_id, count}, {:yield, node_id}, {:sub, job_id}, :end, :retry) based on the
Node's structured outputs. There is no static next field on a Node — execution is defined entirely
by the routing decisions emitted at runtime.
Events
Lifecycle and per-Node activity are surfaced through the Agens.Backend behaviour. Configured
backends (see Agens.backends/0 for the defaults) receive callbacks for every significant event.
The default emit backend forwards them to the caller process as messages, suitable for
handle_info/2 in a UI/pubsub layer; the default log backend writes structured logs. Implement
your own Agens.Backend for custom persistence or side effects.
The default emit backend sends:
Job
{:job_run, job_id, run_id}
{:job_status, {run_id, status}}
{:job_complete, run_id}
{:job_ended, run_id}
{:job_error, message, error}Node
{:node_started, message}
{:node_retry, message}
{:node_result, message}Tool / Resource / Prompt
The following are emitted when applicable (e.g. :tool_call only when the Node has :tools set,
:resource_load only when :resources are configured):
{:tool_call, message, tool_call}
{:resource_load, message, resource}
{:prompt, {system, user}}Yield
Emitted while a yielding Node waits on, or aggregates, parallel threads:
{:yield_wait, {message, total_count, ready_count}}
{:yield_done, {message, total_count}}Routing and Sub-Jobs
Every Agens.Job.Node declares a :serving, even when it also declares :sub. The Node's Serving owns routing: the parent Node's next instructions are always produced by the Serving's router (Agens.Serving.handle_result/3 for normal inference, Agens.Serving.handle_sub/3 for a resolved Sub-Job). There is no static next field on Agens.Job.Node — all routing is dynamic.
Two distinct Sub flows are supported:
- Sub Node (
Agens.Job.Nodewith:subset) — the Sub-Job runs in place of inference on the parent Node. When the Sub completes, the parent invokesAgens.Serving.handle_sub/3on the Node's declared Serving to map the Sub's finalAgens.Messageinto the parent Node'soutputsandnext. - Sub via routing instruction (
{:sub, job_id}returned in a Serving'snext) — the Sub-Job runs as additional work after the Node's inference has completed and routing was already decided. The Sub's terminal message drives subsequent routing in the parent;handle_sub/3is not invoked.
Configuration is validated on start/2. Missing :serving on any Node raises ArgumentError immediately rather than failing at runtime.
Summary
Functions
Retrieves the Agens.Job.Config of a running Job by run_id or pid.
Runs a Job that was previously started with start/2.
Starts a new supervised Job process for the given Agens.Job.Config and run_id.
Stops a running Job by run_id.
Functions
@spec get_config(pid() | binary()) :: {:ok, Agens.Job.Config.t()} | {:error, :run_not_found}
Retrieves the Agens.Job.Config of a running Job by run_id or pid.
Returns {:error, :run_not_found} if no Job is running under the given run_id.
@spec run(pid() | binary(), String.t(), keyword()) :: :ok | {:error, :run_not_found | :job_already_running | :input_required}
Runs a Job that was previously started with start/2.
Identifies the Job by its run_id (or directly by pid) and begins execution at the
Config's :starting_node_id with the given input. Returns immediately with :ok once
the run is accepted; subsequent progress is surfaced through the configured Agens.Backends.
Options
:caller- The pid to attribute backend events to (used as the first argument of everyAgens.Backendcallback). Defaults to the calling process.:sub- Internal use. AnAgens.Job.Substruct supplied by the runtime when this Job is being executed as a Sub-Job of a parent run.
Errors
{:error, :run_not_found}- No Job process is registered under the givenrun_id.{:error, :job_already_running}- The Job has already started and not yet completed.{:error, :input_required}-inputwasnil.
@spec start(Agens.Job.Config.t(), binary()) :: {:ok, pid()} | {:error, term()}
Starts a new supervised Job process for the given Agens.Job.Config and run_id.
Validates the config (raises ArgumentError if any Node is missing :serving) and registers
the process in Agens.Registry under run_id. The same Config can be started multiple times
in parallel by passing distinct run_ids — typically obtained from Agens.generate_uid/0.
start/2 only starts the supervised process; it does not run the Job. Call run/3 to begin
execution.
@spec stop(binary()) :: :ok | {:error, :run_not_found}
Stops a running Job by run_id.
Returns :ok when the Job process is found and stopped, or
{:error, :run_not_found} if no Job is running under the given run_id.