Eai.Hub.Pipeline (eai v1.0.5)

Copy Markdown

Runs pre/post hook pipelines and manages the :eai_hooks persistent_term registry.

Pipeline semantics

  • Pre-hooks run in ascending priority order (lowest number first).
  • Post-hooks run in ascending priority order; each hook sees the result already modified by all prior post-hooks (pipeline/reduce semantics).
  • A :block verdict from any hook short-circuits the pipeline immediately (Enum.reduce_while — block is a veto, not just a vote).
  • Hook errors → fail open: telemetry fires [:eai, :hook, :error], original call continues as if the hook returned :ok.

Why Enum.reduce_while for post-hooks?

Post-hooks accumulate a result value through the chain. reduce_while lets a block verdict break early without processing remaining hooks, which matters for security-gating hooks that want to suppress a result entirely.

LLM hooks

llm_pre_hooks/4 and llm_post_hooks/5 follow the same pattern as tool hooks but operate on LLM HTTP request boundaries rather than individual tool calls. The tool_name is always "LLM_REQUEST".

Graph

<<{Eai.Hub.Pipeline, required_by, Eai.Hub}. <<{Eai.Hub.Pipeline, required_by, Eai.PTY.Session}.

Summary

Functions

Return the currently registered hooks, or [] if none loaded yet.

Run all post-hooks after an LLM HTTP response.

Run all pre-hooks before an LLM HTTP request.

Run all post-hooks for (mod, fun, args, result).

Run post-hooks for a terminal lifecycle event (e.g. PTY.Session.terminate/2).

Run all pre-hooks for (mod, fun, args).

Store the sorted hook list into :persistent_term.

Functions

hooks()

@spec hooks() :: [{module(), non_neg_integer()}]

Return the currently registered hooks, or [] if none loaded yet.

llm_post_hooks(messages, pty_session_id, chat_session_id, opts, result)

@spec llm_post_hooks([any()], String.t(), String.t(), map(), any()) ::
  {:ok, any()} | {:block, String.t()}

Run all post-hooks after an LLM HTTP response.

The result is the raw return triple from the LLM call: {:ok, reply, history} or {:error, reason, partial_history}.

Returns:

  • {:ok, final_result} — pipeline completed; result may have been modified
  • {:block, reason} — a hook vetoed the result (caller discards)

llm_pre_hooks(messages, pty_session_id, chat_session_id, opts)

@spec llm_pre_hooks([any()], String.t(), String.t(), map()) ::
  :ok | {:block, String.t()} | {:modify, map()}

Run all pre-hooks before an LLM HTTP request.

The tool_name is "LLM_REQUEST". Payload carries the full request context.

Returns:

  • :ok — proceed with original context
  • {:block, reason} — hook vetoed; caller should abort the LLM call
  • {:modify, ctx} — hook modified the context (messages, session, opts, etc.)

post_hooks(mod, fun, args, result)

@spec post_hooks(module(), atom(), [any()], any()) ::
  {:ok, any()} | {:block, String.t()}

Run all post-hooks for (mod, fun, args, result).

Returns:

  • {:ok, final_result} — pipeline completed; result may have been modified
  • {:block, reason} — a hook vetoed the result

Why pipeline (each hook sees previous hook's modified result)? This matches the spec (decision #8): post-hooks are composable transforms, e.g. hook A sanitizes, hook B rate-limits based on sanitized output. reduce_while gives us short-circuit on block.

post_only_hooks(mod, fun, args)

@spec post_only_hooks(module(), atom(), [any()]) ::
  {:ok, any()} | {:block, String.t()}

Run post-hooks for a terminal lifecycle event (e.g. PTY.Session.terminate/2).

The result passed to each hook is {:terminated, reason}. Hooks distinguish terminal events from normal post results by pattern-matching on the tagged tuple:

def verdict(:post, _tool, _payload, {:terminated, reason}), do: cleanup(reason)
def verdict(:post, _tool, _payload, result), do: normal(result)

Semantics

  • :block verdict aborts the remaining hook chain only; it does not prevent OTP from continuing the process shutdown (terminate/2 return is ignored by OTP).
  • Hooks must not GenServer.call the dying process — deadlock. Use Cache / PubSub / ETS for any side effects.

pre_hooks(mod, fun, args)

@spec pre_hooks(module(), atom(), [any()]) ::
  :ok | {:block, String.t()} | {:modify, [any()]}

Run all pre-hooks for (mod, fun, args).

Returns:

  • :ok — all hooks passed, proceed with original args
  • {:block, reason} — a hook vetoed the call; caller should abort
  • {:modify, new_args} — hooks modified args; caller should use new_args

Why not pass args through as accumulator here? Pre-hooks can modify args, and we pass the latest args into each subsequent hook so they see the already-modified version. We still short-circuit on block (reduce_while).

register(hook_entries)

@spec register([{module(), non_neg_integer()}]) :: :ok

Store the sorted hook list into :persistent_term.

Called by Eai.Hub.Reloader.reload!/0 after compiling the hooks. Hooks are stored as [{module, priority}] sorted ascending by priority.