Long-lived claude session over the CLI's stream-json duplex protocol.
Holds a single claude subprocess open across many turns, communicating
via NDJSON on stdin/stdout. Complementary to ClaudeWrapper.Query and
ClaudeWrapper.Session -- those spawn one subprocess per turn and are
the right fit for short-lived hosts (escripts, mix tasks, batch jobs).
DuplexSession is for long-running hosts (Phoenix servers, agent
runtimes, OTP applications) where holding a claude open across turns
is cheap.
This is the mode @anthropic-ai/claude-agent-sdk uses internally and
that the @agentclientprotocol/claude-agent-acp bridge relies on for
IDE integrations like Zed's agent panel.
See https://github.com/genagent/claude_wrapper_ex/issues/55 for the
full design discussion and phased rollout.
Usage
config = ClaudeWrapper.Config.new()
# Provide a permission callback to decide on tool use mid-turn.
# The default is to deny everything.
on_permission = fn tool_name, _input ->
if tool_name in ["Bash", "Edit"], do: {:deny, "not allowed"}, else: :allow
end
{:ok, pid} =
ClaudeWrapper.DuplexSession.start_link(
config: config,
on_permission: on_permission
)
# Subscribe the calling process to streaming events.
:ok = ClaudeWrapper.DuplexSession.subscribe(pid)
{:ok, result} = ClaudeWrapper.DuplexSession.send(pid, "Say hi.")
ClaudeWrapper.DuplexSession.stop(pid)Permission callback
The optional :on_permission callback runs synchronously inside the
GenServer when the CLI emits a can_use_tool control request. Two
arities are supported and detected at call time:
(tool_name, input) -> decision-- when the decision can be made from the tool name and input alone (allow/deny lists, role-based policy, etc.).(tool_name, input, request_id) -> decision-- when the handler may return:deferand a separate process needs to callrespond_to_permission/3later. Therequest_idlets the handler correlate the deferred response with the original request (e.g. broadcast{:permission_request, request_id, ...}to a UI; the UI eventually answers viarespond_to_permission/3).
The decision is one of:
:allow-- allow the tool with the original input{:allow, updated_input}-- allow the tool with a modified input map (sandbox a path, redact a secret, etc.){:deny, reason}-- deny the tool with a reason string the model will see:defer-- do not respond synchronously; the caller is expected to invokerespond_to_permission/3later
The callback runs in the GenServer process, so synchronous decisions
must be fast. For slow decisions, return :defer and answer later.
The default callback is &deny_all/2, which denies every tool call.
Without an explicit callback or one of the CLI's other permission
modes (plan, bypass_permissions, etc.) tool use will not work.
Subscriber events
Subscribers receive plain messages of the form {:claude, event}:
{:system_init, session_id}-- the CLI's init event{:assistant, msg}-- a full assistant turn (SDKAssistantMessage){:stream_event, msg}-- a partial assistant token (SDKPartialAssistantMessage){:user, msg}-- a user message (e.g. tool results, replays){:result, %ClaudeWrapper.Result{}}-- the parsed turn boundary
Subscribers are monitored; if a subscriber crashes or exits, it is automatically removed.
Summary
Types
Permission decision callback. Two arities are supported
Functions
Returns a specification to start this module under a supervisor.
Graceful close: shorthand for stop(server, :normal, 10_000).
Default permission handler. Denies every tool call.
Send an interrupt control_request to the CLI. The CLI cancels any
in-flight turn and emits a result with a cancel-flavored stop
reason; that result still flows through the normal send/3 reply.
Answer a deferred permission request.
Send a user prompt. Blocks until the turn's result event arrives.
Return the session ID assigned by the CLI on system/init, or nil
if init has not yet been observed.
Start a duplex session.
Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.
Subscribe the calling process to streaming events.
Stop sending events to the calling process. Idempotent.
Types
@type option() :: {:config, ClaudeWrapper.Config.t()} | {:extra_args, [String.t()]} | {:on_permission, permission_handler()} | {:name, GenServer.name()} | GenServer.option()
@type permission_decision() :: :allow | {:allow, tool_input()} | {:deny, String.t()} | :defer
@type permission_handler() :: (String.t(), tool_input() -> permission_decision()) | (String.t(), tool_input(), String.t() -> permission_decision())
Permission decision callback. Two arities are supported:
(tool_name, input) -> decision-- the original signature. Use when the decision can be made from the tool name and input alone (allow/deny lists, role-based policy, etc.).(tool_name, input, request_id) -> decision-- carries therequest_idof the inboundcan_use_toolcontrol request. Required if the handler returns:deferand a different process needs to callrespond_to_permission/3later (chat UI: handler broadcasts the request to a LiveView, which surfaces approve/deny and answers asynchronously).
Arity is detected at call time so existing 2-arity callbacks keep working unchanged.
@type state() :: %ClaudeWrapper.DuplexSession{ buffer: binary(), config: ClaudeWrapper.Config.t(), on_permission: permission_handler(), pending_control: %{required(String.t()) => GenServer.from()}, pending_turn: {GenServer.from(), [map()]} | nil, port: port() | nil, session_id: String.t() | nil, subscribers: %{required(pid()) => reference()} }
@type tool_input() :: map()
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec close(GenServer.server()) :: :ok
Graceful close: shorthand for stop(server, :normal, 10_000).
Closes the port (which sends SIGTERM to the child), waits up to 10 seconds for it to exit, and shuts down the GenServer.
@spec deny_all(String.t(), tool_input()) :: permission_decision()
Default permission handler. Denies every tool call.
Public so it can be referenced as a default value (&deny_all/2).
@spec interrupt(GenServer.server(), timeout()) :: :ok | {:error, term()}
Send an interrupt control_request to the CLI. The CLI cancels any
in-flight turn and emits a result with a cancel-flavored stop
reason; that result still flows through the normal send/3 reply.
This call returns once the CLI acknowledges the interrupt with a
matching control_response. The caller of send/3 will receive
its own reply when the resulting result event arrives.
Calling interrupt/1 outside of an active turn is harmless: the
CLI accepts the request, acks it, and emits a synthetic result the
GenServer drops.
@spec respond_to_permission(GenServer.server(), String.t(), permission_decision()) :: :ok | {:error, :cannot_defer_again}
Answer a deferred permission request.
Used after the :on_permission callback returned :defer for the
given request_id. Calling this with a request_id the session has
no record of is a no-op (returns :ok). The decision accepts the
same shape as a synchronous handler return value, except :defer,
which is rejected with {:error, :cannot_defer_again}.
@spec send(GenServer.server(), String.t(), timeout()) :: {:ok, ClaudeWrapper.Result.t()} | {:error, term()}
Send a user prompt. Blocks until the turn's result event arrives.
Returns {:ok, %Result{}} on success, {:error, :turn_in_flight} if
another turn is already running, or {:error, reason} on failure.
The default timeout is 120 seconds because the entire turn duration
must complete within it (cold start + model latency + tool calls).
@spec session_id(GenServer.server()) :: String.t() | nil
Return the session ID assigned by the CLI on system/init, or nil
if init has not yet been observed.
@spec start_link([option()]) :: GenServer.on_start()
Start a duplex session.
Options
:config-- (required)%ClaudeWrapper.Config{}struct.:extra_args-- extra CLI flags to append (e.g.["--permission-mode", "plan", "--max-turns", "1"]).:name-- register the GenServer under a name.
All other keyword options are passed through to GenServer.start_link/3.
@spec stop(GenServer.server(), term(), timeout()) :: :ok
Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.
See also close/1 for a short-form alias.
@spec subscribe(GenServer.server()) :: :ok
Subscribe the calling process to streaming events.
Subscribers receive plain {:claude, event} messages -- see the
module doc for the event vocabulary. The subscriber is monitored;
if it exits, it is automatically removed.
Subscribing the same process twice is a no-op.
@spec unsubscribe(GenServer.server()) :: :ok
Stop sending events to the calling process. Idempotent.