ClaudeWrapper.DuplexSession (ClaudeWrapper v0.6.0)

Copy Markdown View Source

Long-lived claude session over the CLI's stream-json duplex protocol.

Holds a single claude subprocess open across many turns, communicating via NDJSON on stdin/stdout. Complementary to ClaudeWrapper.Query and ClaudeWrapper.Session -- those spawn one subprocess per turn and are the right fit for short-lived hosts (escripts, mix tasks, batch jobs). DuplexSession is for long-running hosts (Phoenix servers, agent runtimes, OTP applications) where holding a claude open across turns is cheap.

This is the mode @anthropic-ai/claude-agent-sdk uses internally and that the @agentclientprotocol/claude-agent-acp bridge relies on for IDE integrations like Zed's agent panel.

See https://github.com/genagent/claude_wrapper_ex/issues/55 for the full design discussion and phased rollout.

Usage

config = ClaudeWrapper.Config.new()

# Provide a permission callback to decide on tool use mid-turn.
# The default is to deny everything.
on_permission = fn tool_name, _input ->
  if tool_name in ["Bash", "Edit"], do: {:deny, "not allowed"}, else: :allow
end

{:ok, pid} =
  ClaudeWrapper.DuplexSession.start_link(
    config: config,
    on_permission: on_permission
  )

# Subscribe the calling process to streaming events.
:ok = ClaudeWrapper.DuplexSession.subscribe(pid)

{:ok, result} = ClaudeWrapper.DuplexSession.send(pid, "Say hi.")

ClaudeWrapper.DuplexSession.stop(pid)

Permission callback

The optional :on_permission callback runs synchronously inside the GenServer when the CLI emits a can_use_tool control request. It must return one of:

  • :allow -- allow the tool with the original input
  • {:allow, updated_input} -- allow the tool with a modified input map (sandbox a path, redact a secret, etc.)
  • {:deny, reason} -- deny the tool with a reason string the model will see
  • :defer -- do not respond synchronously; the caller is expected to invoke respond_to_permission/3 later. Use this when a human decision is required and the GenServer must not block

The callback runs in the GenServer process, so synchronous decisions must be fast. For slow decisions, return :defer and answer later via respond_to_permission/3.

The default callback is &deny_all/2, which denies every tool call. Without an explicit callback or one of the CLI's other permission modes (plan, bypass_permissions, etc.) tool use will not work.

Subscriber events

Subscribers receive plain messages of the form {:claude, event}:

  • {:system_init, session_id} -- the CLI's init event
  • {:assistant, msg} -- a full assistant turn (SDKAssistantMessage)
  • {:stream_event, msg} -- a partial assistant token (SDKPartialAssistantMessage)
  • {:user, msg} -- a user message (e.g. tool results, replays)
  • {:result, %ClaudeWrapper.Result{}} -- the parsed turn boundary

Subscribers are monitored; if a subscriber crashes or exits, it is automatically removed.

Summary

Functions

Returns a specification to start this module under a supervisor.

Graceful close: shorthand for stop(server, :normal, 10_000).

Default permission handler. Denies every tool call.

Send an interrupt control_request to the CLI. The CLI cancels any in-flight turn and emits a result with a cancel-flavored stop reason; that result still flows through the normal send/3 reply.

Answer a deferred permission request.

Send a user prompt. Blocks until the turn's result event arrives.

Return the session ID assigned by the CLI on system/init, or nil if init has not yet been observed.

Start a duplex session.

Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.

Subscribe the calling process to streaming events.

Stop sending events to the calling process. Idempotent.

Types

option()

@type option() ::
  {:config, ClaudeWrapper.Config.t()}
  | {:extra_args, [String.t()]}
  | {:on_permission, permission_handler()}
  | {:name, GenServer.name()}
  | GenServer.option()

permission_decision()

@type permission_decision() ::
  :allow | {:allow, tool_input()} | {:deny, String.t()} | :defer

permission_handler()

@type permission_handler() :: (String.t(), tool_input() -> permission_decision())

state()

@type state() :: %ClaudeWrapper.DuplexSession{
  buffer: binary(),
  config: ClaudeWrapper.Config.t(),
  on_permission: permission_handler(),
  pending_control: %{required(String.t()) => GenServer.from()},
  pending_turn: {GenServer.from(), [map()]} | nil,
  port: port() | nil,
  session_id: String.t() | nil,
  subscribers: %{required(pid()) => reference()}
}

tool_input()

@type tool_input() :: map()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close(server)

@spec close(GenServer.server()) :: :ok

Graceful close: shorthand for stop(server, :normal, 10_000).

Closes the port (which sends SIGTERM to the child), waits up to 10 seconds for it to exit, and shuts down the GenServer.

deny_all(tool_name, input)

@spec deny_all(String.t(), tool_input()) :: permission_decision()

Default permission handler. Denies every tool call.

Public so it can be referenced as a default value (&deny_all/2).

interrupt(server, timeout \\ 10000)

@spec interrupt(GenServer.server(), timeout()) :: :ok | {:error, term()}

Send an interrupt control_request to the CLI. The CLI cancels any in-flight turn and emits a result with a cancel-flavored stop reason; that result still flows through the normal send/3 reply.

This call returns once the CLI acknowledges the interrupt with a matching control_response. The caller of send/3 will receive its own reply when the resulting result event arrives.

Calling interrupt/1 outside of an active turn is harmless: the CLI accepts the request, acks it, and emits a synthetic result the GenServer drops.

respond_to_permission(server, request_id, decision)

@spec respond_to_permission(GenServer.server(), String.t(), permission_decision()) ::
  :ok | {:error, :cannot_defer_again}

Answer a deferred permission request.

Used after the :on_permission callback returned :defer for the given request_id. Calling this with a request_id the session has no record of is a no-op (returns :ok). The decision accepts the same shape as a synchronous handler return value, except :defer, which is rejected with {:error, :cannot_defer_again}.

send(server, prompt, timeout \\ 120_000)

@spec send(GenServer.server(), String.t(), timeout()) ::
  {:ok, ClaudeWrapper.Result.t()} | {:error, term()}

Send a user prompt. Blocks until the turn's result event arrives.

Returns {:ok, %Result{}} on success, {:error, :turn_in_flight} if another turn is already running, or {:error, reason} on failure.

The default timeout is 120 seconds because the entire turn duration must complete within it (cold start + model latency + tool calls).

session_id(server)

@spec session_id(GenServer.server()) :: String.t() | nil

Return the session ID assigned by the CLI on system/init, or nil if init has not yet been observed.

start_link(opts)

@spec start_link([option()]) :: GenServer.on_start()

Start a duplex session.

Options

  • :config -- (required) %ClaudeWrapper.Config{} struct.
  • :extra_args -- extra CLI flags to append (e.g. ["--permission-mode", "plan", "--max-turns", "1"]).
  • :name -- register the GenServer under a name.

All other keyword options are passed through to GenServer.start_link/3.

stop(server, reason \\ :normal, timeout \\ 5000)

@spec stop(GenServer.server(), term(), timeout()) :: :ok

Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.

See also close/1 for a short-form alias.

subscribe(server)

@spec subscribe(GenServer.server()) :: :ok

Subscribe the calling process to streaming events.

Subscribers receive plain {:claude, event} messages -- see the module doc for the event vocabulary. The subscriber is monitored; if it exits, it is automatically removed.

Subscribing the same process twice is a no-op.

unsubscribe(server)

@spec unsubscribe(GenServer.server()) :: :ok

Stop sending events to the calling process. Idempotent.