Real-time Ethereum subscriptions via eth_subscribe over WebSocket.
Wraps zen_websocket with Ethereum-specific subscription management. Supports three subscription types: new block headers, pending transactions, and filtered event logs.
Does
- Connect to a WebSocket-capable Ethereum endpoint (
connect/2) - Subscribe to
newHeads,newPendingTransactions, andlogs(subscribe/3) - Parse subscription notifications into normalized Elixir maps
- Deliver parsed events via handler function or process messages
- Unsubscribe and clean up resources (
unsubscribe/2,close/1)
Does Not
- Persist or index events (see rexex for durable indexing)
- Convert HTTP RPC URLs to WebSocket URLs (consumer provides
wss://directly) - Manage reconnection subscriptions (delegates reconnection to zen_websocket)
Event Delivery
Events are delivered via a handler function passed to connect/2. The default
handler sends {:subscription, event} messages to the calling process.
Event shapes:
{:new_heads, subscription_id, head_map}{:pending_transactions, subscription_id, tx_hash}{:logs, subscription_id, log_map}{:parse_error, subscription_id, reason}— malformed notification;reasonis a tagged tuple from the internal parser ({:invalid_head, _}|{:invalid_tx_hash, _}|{:invalid_log, _})
Subscribe Race Buffering
The race between eth_subscribe's RPC reply (which returns the subscription_id)
and the Agent registration of that id is closed by buffering: notifications that
arrive for an unregistered subscription_id are queued per-id (cap
100 entries; oldest dropped on overflow with a Logger.warning).
On registration, buffered notifications are flushed FIFO through the same handler
path before subscribe/3 returns.
Cross-buffer / post-registration ordering is best-effort: a notification that arrives between the atomic register-and-drain step and the synchronous flush is dispatched immediately and may interleave with buffered events. Acceptable for self-contained, independent notifications (heads, hashes, logs).
Handler exceptions during flush propagate (consistent with fire-and-forget
dispatch_event/4 semantics). Remaining buffered events for that flush are lost;
Agent state remains consistent.
Error Format
- Connection failures:
{:error, {:connection_error, reason}} - RPC errors:
{:error, {:rpc_error, %{code: integer, message: string}}} - Invalid subscription type:
{:error, {:invalid_subscription_type, type}}
Functions
| Function | Purpose |
|---|---|
connect/2 | Open WebSocket connection to Ethereum node |
connect!/2 | Same, raises on error |
subscribe/3 | Subscribe to a notification type |
subscribe!/3 | Same, raises on error |
unsubscribe/2 | Cancel a subscription by ID |
unsubscribe!/2 | Same, raises on error |
close/1 | Close connection and free resources |
API Functions
| Function | Arity | Description | Param Kinds |
|---|---|---|---|
close | 1 | Close the WebSocket connection and free resources. | sub: value |
unsubscribe! | 2 | Cancel a subscription by ID. Raises on error. | sub: value, subscription_id: value |
unsubscribe | 2 | Cancel a subscription by ID. | sub: value, subscription_id: value |
subscribe! | 3 | Subscribe to an Ethereum notification type. Raises on error. | sub: value, type: value, opts: value |
subscribe | 3 | Subscribe to an Ethereum notification type. | sub: value, type: value, opts: value |
connect! | 2 | Open a WebSocket connection. Raises on error. | ws_url: value, opts: value |
connect | 2 | Open a WebSocket connection to an Ethereum node. | ws_url: value, opts: value |
Summary
Functions
Close the WebSocket connection and free resources.
Open a WebSocket connection to an Ethereum node.
Open a WebSocket connection. Raises on error.
Subscribe to an Ethereum notification type.
Subscribe to an Ethereum notification type. Raises on error.
Cancel a subscription by ID.
Cancel a subscription by ID. Raises on error.
Types
@type head() :: %{ number: non_neg_integer(), hash: String.t(), parent_hash: String.t(), timestamp: non_neg_integer(), miner: String.t(), gas_limit: non_neg_integer(), gas_used: non_neg_integer(), base_fee_per_gas: non_neg_integer() | nil, logs_bloom: String.t(), transactions_root: String.t(), state_root: String.t(), receipts_root: String.t() }
@type log() :: %{ address: String.t(), topics: [String.t()], data: String.t(), block_number: non_neg_integer(), transaction_hash: String.t(), log_index: non_neg_integer(), transaction_index: non_neg_integer(), removed: boolean() }
@type subscription_type() :: :new_heads | :pending_transactions | {:logs, map()}
@type t() :: %Onchain.Subscription{ agent: pid(), client: ZenWebsocket.Client.t(), handler: handler() }
Functions
@spec close(t()) :: :ok
Close the WebSocket connection and free resources.
Parameters
sub- Subscription handle (value)
Returns
Always returns :ok (:ok)
# descripex:contract
%{
params: %{sub: %{description: "Subscription handle", kind: :value}},
returns: %{type: ":ok", description: "Always returns :ok"}
}
Open a WebSocket connection to an Ethereum node.
Parameters
ws_url- WebSocket URL (wss:// or ws://) (value)opts- Options: :handler (event callback fn), plus zen_websocket options (:retry_count, :retry_delay, :max_backoff) (default:[], value)
Returns
Subscription handle for subscribe/unsubscribe/close calls ({:ok, %Onchain.Subscription{}} | {:error, term()})
# descripex:contract
%{
params: %{
opts: %{
default: [],
description: "Options: :handler (event callback fn), plus zen_websocket options (:retry_count, :retry_delay, :max_backoff)",
kind: :value
},
ws_url: %{description: "WebSocket URL (wss:// or ws://)", kind: :value}
},
returns: %{
type: "{:ok, %Onchain.Subscription{}} | {:error, term()}",
description: "Subscription handle for subscribe/unsubscribe/close calls"
}
}
Open a WebSocket connection. Raises on error.
Parameters
ws_url- WebSocket URL (wss:// or ws://) (value)opts- Same options as connect/2 (default:[], value)
Returns
Subscription handle (%Onchain.Subscription{})
# descripex:contract
%{
params: %{
opts: %{default: [], description: "Same options as connect/2", kind: :value},
ws_url: %{description: "WebSocket URL (wss:// or ws://)", kind: :value}
},
returns: %{
type: "%Onchain.Subscription{}",
description: "Subscription handle"
}
}
@spec subscribe(t(), subscription_type(), keyword()) :: {:ok, String.t()} | {:error, term()}
Subscribe to an Ethereum notification type.
Parameters
sub- Subscription handle from connect/2 (value)type- Subscription type: :new_heads, :pending_transactions, or {:logs, filter_map} (value)opts- Reserved for future options (default:[], value)
Returns
Subscription ID for unsubscribe ({:ok, subscription_id} | {:error, term()})
# descripex:contract
%{
params: %{
type: %{
description: "Subscription type: :new_heads, :pending_transactions, or {:logs, filter_map}",
kind: :value
},
opts: %{
default: [],
description: "Reserved for future options",
kind: :value
},
sub: %{description: "Subscription handle from connect/2", kind: :value}
},
returns: %{
type: "{:ok, subscription_id} | {:error, term()}",
description: "Subscription ID for unsubscribe"
}
}
@spec subscribe!(t(), subscription_type(), keyword()) :: String.t()
Subscribe to an Ethereum notification type. Raises on error.
Parameters
sub- Subscription handle from connect/2 (value)type- Same types as subscribe/3 (value)opts- Reserved for future options (default:[], value)
Returns
Subscription ID (String.t())
# descripex:contract
%{
params: %{
type: %{description: "Same types as subscribe/3", kind: :value},
opts: %{
default: [],
description: "Reserved for future options",
kind: :value
},
sub: %{description: "Subscription handle from connect/2", kind: :value}
},
returns: %{type: "String.t()", description: "Subscription ID"}
}
Cancel a subscription by ID.
Parameters
sub- Subscription handle (value)subscription_id- Subscription ID from subscribe/3 (value)
Returns
true if unsubscribed successfully ({:ok, boolean()} | {:error, term()})
# descripex:contract
%{
params: %{
sub: %{description: "Subscription handle", kind: :value},
subscription_id: %{
description: "Subscription ID from subscribe/3",
kind: :value
}
},
returns: %{
type: "{:ok, boolean()} | {:error, term()}",
description: "true if unsubscribed successfully"
}
}
Cancel a subscription by ID. Raises on error.
Parameters
sub- Subscription handle (value)subscription_id- Subscription ID (value)
Returns
true if unsubscribed (boolean())
# descripex:contract
%{
params: %{
sub: %{description: "Subscription handle", kind: :value},
subscription_id: %{description: "Subscription ID", kind: :value}
},
returns: %{type: "boolean()", description: "true if unsubscribed"}
}