ClaudeWrapper.DuplexSession (ClaudeWrapper v0.8.0)

Copy Markdown View Source

Long-lived claude session over the CLI's stream-json duplex protocol.

Holds a single claude subprocess open across many turns, communicating via NDJSON on stdin/stdout. Complementary to ClaudeWrapper.Query and ClaudeWrapper.Session -- those spawn one subprocess per turn and are the right fit for short-lived hosts (escripts, mix tasks, batch jobs). DuplexSession is for long-running hosts (Phoenix servers, agent runtimes, OTP applications) where holding a claude open across turns is cheap.

This is the mode @anthropic-ai/claude-agent-sdk uses internally and that the @agentclientprotocol/claude-agent-acp bridge relies on for IDE integrations like Zed's agent panel.

See https://github.com/genagent/claude_wrapper_ex/issues/55 for the full design discussion and phased rollout.

Usage

config = ClaudeWrapper.Config.new()

# Provide a permission callback to decide on tool use mid-turn.
# The default is to deny everything.
on_permission = fn tool_name, _input ->
  if tool_name in ["Bash", "Edit"], do: {:deny, "not allowed"}, else: :allow
end

{:ok, pid} =
  ClaudeWrapper.DuplexSession.start_link(
    config: config,
    on_permission: on_permission
  )

# Subscribe the calling process to streaming events.
:ok = ClaudeWrapper.DuplexSession.subscribe(pid)

{:ok, result} = ClaudeWrapper.DuplexSession.send(pid, "Say hi.")

ClaudeWrapper.DuplexSession.stop(pid)

Permission callback

The optional :on_permission callback runs synchronously inside the GenServer when the CLI emits a can_use_tool control request. Two arities are supported and detected at call time:

  • (tool_name, input) -> decision -- when the decision can be made from the tool name and input alone (allow/deny lists, role-based policy, etc.).

  • (tool_name, input, request_id) -> decision -- when the handler may return :defer and a separate process needs to call respond_to_permission/3 later. The request_id lets the handler correlate the deferred response with the original request (e.g. broadcast {:permission_request, request_id, ...} to a UI; the UI eventually answers via respond_to_permission/3).

The decision is one of:

  • :allow -- allow the tool with the original input
  • {:allow, updated_input} -- allow the tool with a modified input map (sandbox a path, redact a secret, etc.)
  • {:deny, reason} -- deny the tool with a reason string the model will see
  • :defer -- do not respond synchronously; the caller is expected to invoke respond_to_permission/3 later

The callback runs in the GenServer process, so synchronous decisions must be fast. For slow decisions, return :defer and answer later.

The default callback is &deny_all/2, which denies every tool call. Without an explicit callback or one of the CLI's other permission modes (plan, bypass_permissions, etc.) tool use will not work.

Subscriber events

Subscribers receive plain messages of the form {:claude, event}:

  • {:system_init, session_id} -- the CLI's init event
  • {:assistant, msg} -- a full assistant turn (SDKAssistantMessage)
  • {:stream_event, msg} -- a partial assistant token (SDKPartialAssistantMessage)
  • {:user, msg} -- a user message (e.g. tool results, replays)
  • {:result, %ClaudeWrapper.Result{}} -- the parsed turn boundary

Subscribers are monitored; if a subscriber crashes or exits, it is automatically removed.

Subscriber delivery has no capacity bound

The Rust crate backs subscribe with a bounded tokio::sync::broadcast channel (default capacity 256) and slow consumers observe a Lagged error. The Elixir session instead delivers each event with Process.send/3 straight into every subscriber's process mailbox, which is unbounded, so there is no subscriber_capacity knob and no lag/drop semantics: a slow subscriber simply accumulates messages in its own mailbox. Apply back-pressure at the subscriber (drain promptly, or unsubscribe) if that is a concern.

Health and liveness

alive?/1, exit_status/1, and wait_for_exit/2 give service-shaped hosts non-consuming visibility into whether a session is still usable, mirroring the Rust crate's is_alive / exit_status / wait_for_exit (SessionExitStatus). See exit_status/0.

wait_for_exit/2 is the authoritative source of the terminal status: it blocks until the session exits and returns :completed for a clean shutdown or {:failed, {:port_exit, code}} when the underlying claude subprocess exits with a non-zero status (or the port closes abnormally). The status is delivered live from terminate/2, so there is no persisted state and nothing to read post-mortem.

exit_status/1 is only a live snapshot: it reports :running while the session process is alive and :completed once the process is gone. It cannot distinguish a clean exit from a failed one after the fact -- use wait_for_exit/2 if you need the failure reason.

Summary

Types

Terminal liveness status of a session, mirroring the Rust crate's SessionExitStatus

Permission decision callback. Two arities are supported

Functions

Cheap, non-consuming liveness check. Mirrors the Rust is_alive.

Returns a specification to start this module under a supervisor.

Graceful close: shorthand for stop(server, :normal, 10_000).

Default permission handler. Denies every tool call.

Live snapshot of the session's exit_status/0. Mirrors the Rust exit_status / SessionExitStatus, but is best-effort only.

Send an interrupt control_request to the CLI. The CLI cancels any in-flight turn and emits a result with a cancel-flavored stop reason; that result still flows through the normal send/3 reply.

Answer a deferred permission request.

Send a user prompt. Blocks until the turn's result event arrives.

Return the session ID assigned by the CLI on system/init, or nil if init has not yet been observed.

Start a duplex session.

Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.

Subscribe the calling process to streaming events.

Stop sending events to the calling process. Idempotent.

Block until the session exits, then return its terminal exit_status/0. Mirrors the Rust wait_for_exit.

Types

exit_status()

@type exit_status() :: :running | :completed | {:failed, term()}

Terminal liveness status of a session, mirroring the Rust crate's SessionExitStatus:

  • :running -- the session process is alive and usable.
  • :completed -- the session shut down cleanly (graceful stop/3/close/1, or the claude subprocess exited with status 0).
  • {:failed, reason} -- the claude subprocess exited abnormally (non-zero status, or the port closed with an error reason). The reason is the recorded {:port_exit, code | term} tuple.

option()

@type option() ::
  {:config, ClaudeWrapper.Config.t()}
  | {:extra_args, [String.t()]}
  | {:on_permission, permission_handler()}
  | {:name, GenServer.name()}
  | GenServer.option()

permission_decision()

@type permission_decision() ::
  :allow | {:allow, tool_input()} | {:deny, String.t()} | :defer

permission_handler()

@type permission_handler() ::
  (String.t(), tool_input() -> permission_decision())
  | (String.t(), tool_input(), String.t() -> permission_decision())

Permission decision callback. Two arities are supported:

  • (tool_name, input) -> decision -- the original signature. Use when the decision can be made from the tool name and input alone (allow/deny lists, role-based policy, etc.).

  • (tool_name, input, request_id) -> decision -- carries the request_id of the inbound can_use_tool control request. Required if the handler returns :defer and a different process needs to call respond_to_permission/3 later (chat UI: handler broadcasts the request to a LiveView, which surfaces approve/deny and answers asynchronously).

Arity is detected at call time so existing 2-arity callbacks keep working unchanged.

state()

@type state() :: %ClaudeWrapper.DuplexSession{
  buffer: binary(),
  config: ClaudeWrapper.Config.t(),
  exit_status: exit_status(),
  exit_waiters: %{required(reference()) => pid()},
  on_permission: permission_handler(),
  pending_control: %{required(String.t()) => GenServer.from()},
  pending_turn: {GenServer.from(), [map()]} | nil,
  port: port() | nil,
  session_id: String.t() | nil,
  subscribers: %{required(pid()) => reference()}
}

tool_input()

@type tool_input() :: map()

Functions

alive?(server)

@spec alive?(GenServer.server()) :: boolean()

Cheap, non-consuming liveness check. Mirrors the Rust is_alive.

Returns true while the session process is alive, false once it has exited (cleanly or with an error). Resolves registered names and pids; any non-pid name that does not resolve is treated as not alive.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close(server)

@spec close(GenServer.server()) :: :ok

Graceful close: shorthand for stop(server, :normal, 10_000).

Closes the port (which sends SIGTERM to the child), waits up to 10 seconds for it to exit, and shuts down the GenServer.

deny_all(tool_name, input)

@spec deny_all(String.t(), tool_input()) :: permission_decision()

Default permission handler. Denies every tool call.

Public so it can be referenced as a default value (&deny_all/2).

exit_status(server)

@spec exit_status(GenServer.server()) :: exit_status()

Live snapshot of the session's exit_status/0. Mirrors the Rust exit_status / SessionExitStatus, but is best-effort only.

Returns :running while the session process is alive and :completed once it is gone. Unlike wait_for_exit/2, this cannot distinguish a clean exit from a failed one after the fact: the session keeps no persisted terminal status, so a dead process always reads back as :completed. If you need the failure reason (e.g. {:failed, {:port_exit, code}}), use wait_for_exit/2, which is the authoritative source.

interrupt(server, timeout \\ 10000)

@spec interrupt(GenServer.server(), timeout()) :: :ok | {:error, term()}

Send an interrupt control_request to the CLI. The CLI cancels any in-flight turn and emits a result with a cancel-flavored stop reason; that result still flows through the normal send/3 reply.

This call returns once the CLI acknowledges the interrupt with a matching control_response. The caller of send/3 will receive its own reply when the resulting result event arrives.

Calling interrupt/1 outside of an active turn is harmless: the CLI accepts the request, acks it, and emits a synthetic result the GenServer drops.

respond_to_permission(server, request_id, decision)

@spec respond_to_permission(GenServer.server(), String.t(), permission_decision()) ::
  :ok | {:error, ClaudeWrapper.Error.t()}

Answer a deferred permission request.

Used after the :on_permission callback returned :defer for the given request_id. Calling this with a request_id the session has no record of is a no-op (returns :ok). The decision accepts the same shape as a synchronous handler return value, except :defer, which is rejected with {:error, %ClaudeWrapper.Error{kind: :cannot_defer_again}}.

:allow and updatedInput

A synchronous handler returning plain :allow has its updatedInput defaulted to the original tool input automatically, since the dispatch site has the input in scope. The deferred path does not -- the session does not retain per-request input across the defer boundary. If you need updatedInput populated (Claude's permission protocol requires it for behavior: "allow"), capture the input when the handler defers and pass {:allow, input} here rather than plain :allow.

send(server, prompt, timeout \\ 120_000)

@spec send(GenServer.server(), String.t(), timeout()) ::
  {:ok, ClaudeWrapper.Result.t()} | {:error, term()}

Send a user prompt. Blocks until the turn's result event arrives.

Returns {:ok, %Result{}} on success, {:error, %ClaudeWrapper.Error{kind: :turn_in_flight}} if another turn is already running, or {:error, %ClaudeWrapper.Error{}} on failure.

The default timeout is 120 seconds because the entire turn duration must complete within it (cold start + model latency + tool calls).

session_id(server)

@spec session_id(GenServer.server()) :: String.t() | nil

Return the session ID assigned by the CLI on system/init, or nil if init has not yet been observed.

start_link(opts)

@spec start_link([option()]) :: GenServer.on_start()

Start a duplex session.

Options

  • :config -- (required) %ClaudeWrapper.Config{} struct.
  • :extra_args -- extra CLI flags to append (e.g. ["--permission-mode", "plan", "--max-turns", "1"]).
  • :name -- register the GenServer under a name.

All other keyword options are passed through to GenServer.start_link/3.

stop(server, reason \\ :normal, timeout \\ 5000)

@spec stop(GenServer.server(), term(), timeout()) :: :ok

Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.

See also close/1 for a short-form alias.

subscribe(server)

@spec subscribe(GenServer.server()) :: :ok

Subscribe the calling process to streaming events.

Subscribers receive plain {:claude, event} messages -- see the module doc for the event vocabulary. The subscriber is monitored; if it exits, it is automatically removed.

Subscribing the same process twice is a no-op.

unsubscribe(server)

@spec unsubscribe(GenServer.server()) :: :ok

Stop sending events to the calling process. Idempotent.

wait_for_exit(server, timeout \\ 5000)

@spec wait_for_exit(GenServer.server(), timeout()) :: exit_status()

Block until the session exits, then return its terminal exit_status/0. Mirrors the Rust wait_for_exit.

This is the authoritative source of the terminal status. It returns :completed for a clean shutdown and {:failed, {:port_exit, code}} when the underlying claude subprocess exited with a non-zero status (or the port closed abnormally). Returns immediately (:completed) if the session has already exited.

Implemented with a Process.monitor/1 plus a one-shot waiter registration on the session: terminate/2 sends each registered waiter the precise terminal status, and the monitor :DOWN is the fallback if the session dies before (or during) registration. Multiple concurrent callers are fine and the call does not consume the session.

timeout (default 5 seconds) bounds the wait; on timeout this returns :running to signal "still alive past the deadline" (the analog of the Rust call simply not having resolved yet).