Long-lived claude session over the CLI's stream-json duplex protocol.
Holds a single claude subprocess open across many turns, communicating
via NDJSON on stdin/stdout. Complementary to ClaudeWrapper.Query and
ClaudeWrapper.Session -- those spawn one subprocess per turn and are
the right fit for short-lived hosts (escripts, mix tasks, batch jobs).
DuplexSession is for long-running hosts (Phoenix servers, agent
runtimes, OTP applications) where holding a claude open across turns
is cheap.
This is the mode @anthropic-ai/claude-agent-sdk uses internally and
that the @agentclientprotocol/claude-agent-acp bridge relies on for
IDE integrations like Zed's agent panel.
See https://github.com/genagent/claude_wrapper_ex/issues/55 for the
full design discussion and phased rollout.
Usage
config = ClaudeWrapper.Config.new()
# Provide a permission callback to decide on tool use mid-turn.
# The default is to deny everything.
on_permission = fn tool_name, _input ->
if tool_name in ["Bash", "Edit"], do: {:deny, "not allowed"}, else: :allow
end
{:ok, pid} =
ClaudeWrapper.DuplexSession.start_link(
config: config,
on_permission: on_permission
)
# Subscribe the calling process to streaming events.
:ok = ClaudeWrapper.DuplexSession.subscribe(pid)
{:ok, result} = ClaudeWrapper.DuplexSession.send(pid, "Say hi.")
ClaudeWrapper.DuplexSession.stop(pid)Permission callback
The optional :on_permission callback runs synchronously inside the
GenServer when the CLI emits a can_use_tool control request. Two
arities are supported and detected at call time:
(tool_name, input) -> decision-- when the decision can be made from the tool name and input alone (allow/deny lists, role-based policy, etc.).(tool_name, input, request_id) -> decision-- when the handler may return:deferand a separate process needs to callrespond_to_permission/3later. Therequest_idlets the handler correlate the deferred response with the original request (e.g. broadcast{:permission_request, request_id, ...}to a UI; the UI eventually answers viarespond_to_permission/3).
The decision is one of:
:allow-- allow the tool with the original input{:allow, updated_input}-- allow the tool with a modified input map (sandbox a path, redact a secret, etc.){:deny, reason}-- deny the tool with a reason string the model will see:defer-- do not respond synchronously; the caller is expected to invokerespond_to_permission/3later
The callback runs in the GenServer process, so synchronous decisions
must be fast. For slow decisions, return :defer and answer later.
The default callback is &deny_all/2, which denies every tool call.
Without an explicit callback or one of the CLI's other permission
modes (plan, bypass_permissions, etc.) tool use will not work.
Subscriber events
Subscribers receive plain messages of the form {:claude, event}:
{:system_init, session_id}-- the CLI's init event{:assistant, msg}-- a full assistant turn (SDKAssistantMessage){:stream_event, msg}-- a partial assistant token (SDKPartialAssistantMessage){:user, msg}-- a user message (e.g. tool results, replays){:result, %ClaudeWrapper.Result{}}-- the parsed turn boundary
Subscribers are monitored; if a subscriber crashes or exits, it is automatically removed.
Subscriber delivery has no capacity bound
The Rust crate backs subscribe with a bounded
tokio::sync::broadcast channel (default capacity 256) and slow
consumers observe a Lagged error. The Elixir session instead
delivers each event with Process.send/3 straight into every
subscriber's process mailbox, which is unbounded, so there is no
subscriber_capacity knob and no lag/drop semantics: a slow
subscriber simply accumulates messages in its own mailbox. Apply
back-pressure at the subscriber (drain promptly, or unsubscribe)
if that is a concern.
Health and liveness
alive?/1, exit_status/1, and wait_for_exit/2 give service-shaped
hosts non-consuming visibility into whether a session is still usable,
mirroring the Rust crate's is_alive / exit_status / wait_for_exit
(SessionExitStatus). See exit_status/0.
wait_for_exit/2 is the authoritative source of the terminal status:
it blocks until the session exits and returns :completed for a clean
shutdown or {:failed, {:port_exit, code}} when the underlying
claude subprocess exits with a non-zero status (or the port closes
abnormally). The status is delivered live from terminate/2, so there
is no persisted state and nothing to read post-mortem.
exit_status/1 is only a live snapshot: it reports :running while
the session process is alive and :completed once the process is
gone. It cannot distinguish a clean exit from a failed one after the
fact -- use wait_for_exit/2 if you need the failure reason.
Summary
Types
Terminal liveness status of a session, mirroring the Rust crate's
SessionExitStatus
Permission decision callback. Two arities are supported
Functions
Cheap, non-consuming liveness check. Mirrors the Rust is_alive.
Returns a specification to start this module under a supervisor.
Graceful close: shorthand for stop(server, :normal, 10_000).
Default permission handler. Denies every tool call.
Live snapshot of the session's exit_status/0. Mirrors the Rust
exit_status / SessionExitStatus, but is best-effort only.
Send an interrupt control_request to the CLI. The CLI cancels any
in-flight turn and emits a result with a cancel-flavored stop
reason; that result still flows through the normal send/3 reply.
Answer a deferred permission request.
Send a user prompt. Blocks until the turn's result event arrives.
Return the session ID assigned by the CLI on system/init, or nil
if init has not yet been observed.
Start a duplex session.
Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.
Subscribe the calling process to streaming events.
Stop sending events to the calling process. Idempotent.
Block until the session exits, then return its terminal
exit_status/0. Mirrors the Rust wait_for_exit.
Types
@type exit_status() :: :running | :completed | {:failed, term()}
Terminal liveness status of a session, mirroring the Rust crate's
SessionExitStatus:
:running-- the session process is alive and usable.:completed-- the session shut down cleanly (gracefulstop/3/close/1, or theclaudesubprocess exited with status0).{:failed, reason}-- theclaudesubprocess exited abnormally (non-zero status, or the port closed with an error reason). Thereasonis the recorded{:port_exit, code | term}tuple.
@type option() :: {:config, ClaudeWrapper.Config.t()} | {:extra_args, [String.t()]} | {:on_permission, permission_handler()} | {:name, GenServer.name()} | GenServer.option()
@type permission_decision() :: :allow | {:allow, tool_input()} | {:deny, String.t()} | :defer
@type permission_handler() :: (String.t(), tool_input() -> permission_decision()) | (String.t(), tool_input(), String.t() -> permission_decision())
Permission decision callback. Two arities are supported:
(tool_name, input) -> decision-- the original signature. Use when the decision can be made from the tool name and input alone (allow/deny lists, role-based policy, etc.).(tool_name, input, request_id) -> decision-- carries therequest_idof the inboundcan_use_toolcontrol request. Required if the handler returns:deferand a different process needs to callrespond_to_permission/3later (chat UI: handler broadcasts the request to a LiveView, which surfaces approve/deny and answers asynchronously).
Arity is detected at call time so existing 2-arity callbacks keep working unchanged.
@type state() :: %ClaudeWrapper.DuplexSession{ buffer: binary(), config: ClaudeWrapper.Config.t(), exit_status: exit_status(), exit_waiters: %{required(reference()) => pid()}, on_permission: permission_handler(), pending_control: %{required(String.t()) => GenServer.from()}, pending_turn: {GenServer.from(), [map()]} | nil, port: port() | nil, session_id: String.t() | nil, subscribers: %{required(pid()) => reference()} }
@type tool_input() :: map()
Functions
@spec alive?(GenServer.server()) :: boolean()
Cheap, non-consuming liveness check. Mirrors the Rust is_alive.
Returns true while the session process is alive, false once it has
exited (cleanly or with an error). Resolves registered names and pids;
any non-pid name that does not resolve is treated as not alive.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec close(GenServer.server()) :: :ok
Graceful close: shorthand for stop(server, :normal, 10_000).
Closes the port (which sends SIGTERM to the child), waits up to 10 seconds for it to exit, and shuts down the GenServer.
@spec deny_all(String.t(), tool_input()) :: permission_decision()
Default permission handler. Denies every tool call.
Public so it can be referenced as a default value (&deny_all/2).
@spec exit_status(GenServer.server()) :: exit_status()
Live snapshot of the session's exit_status/0. Mirrors the Rust
exit_status / SessionExitStatus, but is best-effort only.
Returns :running while the session process is alive and :completed
once it is gone. Unlike wait_for_exit/2, this cannot distinguish a
clean exit from a failed one after the fact: the session keeps no
persisted terminal status, so a dead process always reads back as
:completed. If you need the failure reason (e.g.
{:failed, {:port_exit, code}}), use wait_for_exit/2, which is the
authoritative source.
@spec interrupt(GenServer.server(), timeout()) :: :ok | {:error, term()}
Send an interrupt control_request to the CLI. The CLI cancels any
in-flight turn and emits a result with a cancel-flavored stop
reason; that result still flows through the normal send/3 reply.
This call returns once the CLI acknowledges the interrupt with a
matching control_response. The caller of send/3 will receive
its own reply when the resulting result event arrives.
Calling interrupt/1 outside of an active turn is harmless: the
CLI accepts the request, acks it, and emits a synthetic result the
GenServer drops.
@spec respond_to_permission(GenServer.server(), String.t(), permission_decision()) :: :ok | {:error, ClaudeWrapper.Error.t()}
Answer a deferred permission request.
Used after the :on_permission callback returned :defer for the
given request_id. Calling this with a request_id the session has
no record of is a no-op (returns :ok). The decision accepts the
same shape as a synchronous handler return value, except :defer,
which is rejected with {:error, %ClaudeWrapper.Error{kind: :cannot_defer_again}}.
:allow and updatedInput
A synchronous handler returning plain :allow has its updatedInput
defaulted to the original tool input automatically, since the
dispatch site has the input in scope. The deferred path does not --
the session does not retain per-request input across the defer
boundary. If you need updatedInput populated (Claude's permission
protocol requires it for behavior: "allow"), capture the input
when the handler defers and pass {:allow, input} here rather than
plain :allow.
@spec send(GenServer.server(), String.t(), timeout()) :: {:ok, ClaudeWrapper.Result.t()} | {:error, term()}
Send a user prompt. Blocks until the turn's result event arrives.
Returns {:ok, %Result{}} on success, {:error, %ClaudeWrapper.Error{kind: :turn_in_flight}} if another turn is
already running, or {:error, %ClaudeWrapper.Error{}} on failure.
The default timeout is 120 seconds because the entire turn duration
must complete within it (cold start + model latency + tool calls).
@spec session_id(GenServer.server()) :: String.t() | nil
Return the session ID assigned by the CLI on system/init, or nil
if init has not yet been observed.
@spec start_link([option()]) :: GenServer.on_start()
Start a duplex session.
Options
:config-- (required)%ClaudeWrapper.Config{}struct.:extra_args-- extra CLI flags to append (e.g.["--permission-mode", "plan", "--max-turns", "1"]).:name-- register the GenServer under a name.
All other keyword options are passed through to GenServer.start_link/3.
@spec stop(GenServer.server(), term(), timeout()) :: :ok
Stop the session. Closes the port, waits for the child to exit, and shuts down the GenServer.
See also close/1 for a short-form alias.
@spec subscribe(GenServer.server()) :: :ok
Subscribe the calling process to streaming events.
Subscribers receive plain {:claude, event} messages -- see the
module doc for the event vocabulary. The subscriber is monitored;
if it exits, it is automatically removed.
Subscribing the same process twice is a no-op.
@spec unsubscribe(GenServer.server()) :: :ok
Stop sending events to the calling process. Idempotent.
@spec wait_for_exit(GenServer.server(), timeout()) :: exit_status()
Block until the session exits, then return its terminal
exit_status/0. Mirrors the Rust wait_for_exit.
This is the authoritative source of the terminal status. It returns
:completed for a clean shutdown and {:failed, {:port_exit, code}}
when the underlying claude subprocess exited with a non-zero status
(or the port closed abnormally). Returns immediately (:completed) if
the session has already exited.
Implemented with a Process.monitor/1 plus a one-shot waiter
registration on the session: terminate/2 sends each registered
waiter the precise terminal status, and the monitor :DOWN is the
fallback if the session dies before (or during) registration. Multiple
concurrent callers are fine and the call does not consume the session.
timeout (default 5 seconds) bounds the wait; on timeout this returns
:running to signal "still alive past the deadline" (the analog of
the Rust call simply not having resolved yet).