Onchain.Subscription (onchain v0.5.4)

Copy Markdown View Source

Real-time Ethereum subscriptions via eth_subscribe over WebSocket.

Wraps zen_websocket with Ethereum-specific subscription management. Supports three subscription types: new block headers, pending transactions, and filtered event logs.

Does

  • Connect to a WebSocket-capable Ethereum endpoint (connect/2)
  • Subscribe to newHeads, newPendingTransactions, and logs (subscribe/3)
  • Parse subscription notifications into normalized Elixir maps
  • Deliver parsed events via handler function or process messages
  • Unsubscribe and clean up resources (unsubscribe/2, close/1)

Does Not

  • Persist or index events (see rexex for durable indexing)
  • Convert HTTP RPC URLs to WebSocket URLs (consumer provides wss:// directly)
  • Manage reconnection subscriptions (delegates reconnection to zen_websocket)

Event Delivery

Events are delivered via a handler function passed to connect/2. The default handler sends {:subscription, event} messages to the calling process.

Event shapes:

  • {:new_heads, subscription_id, head_map}
  • {:pending_transactions, subscription_id, tx_hash}
  • {:logs, subscription_id, log_map}
  • {:parse_error, subscription_id, reason} — malformed notification; reason is a tagged tuple from the internal parser ({:invalid_head, _} | {:invalid_tx_hash, _} | {:invalid_log, _})

Subscribe Race Buffering

The race between eth_subscribe's RPC reply (which returns the subscription_id) and the Agent registration of that id is closed by buffering: notifications that arrive for an unregistered subscription_id are queued per-id (cap 100 entries; oldest dropped on overflow with a Logger.warning). On registration, buffered notifications are flushed FIFO through the same handler path before subscribe/3 returns.

Cross-buffer / post-registration ordering is best-effort: a notification that arrives between the atomic register-and-drain step and the synchronous flush is dispatched immediately and may interleave with buffered events. Acceptable for self-contained, independent notifications (heads, hashes, logs).

Handler exceptions during flush propagate (consistent with fire-and-forget dispatch_event/4 semantics). Remaining buffered events for that flush are lost; Agent state remains consistent.

Error Format

  • Connection failures: {:error, {:connection_error, reason}}
  • RPC errors: {:error, {:rpc_error, %{code: integer, message: string}}}
  • Invalid subscription type: {:error, {:invalid_subscription_type, type}}

Functions

FunctionPurpose
connect/2Open WebSocket connection to Ethereum node
connect!/2Same, raises on error
subscribe/3Subscribe to a notification type
subscribe!/3Same, raises on error
unsubscribe/2Cancel a subscription by ID
unsubscribe!/2Same, raises on error
close/1Close connection and free resources

API Functions

FunctionArityDescriptionParam Kinds
close1Close the WebSocket connection and free resources.sub: value
unsubscribe!2Cancel a subscription by ID. Raises on error.sub: value, subscription_id: value
unsubscribe2Cancel a subscription by ID.sub: value, subscription_id: value
subscribe!3Subscribe to an Ethereum notification type. Raises on error.sub: value, type: value, opts: value
subscribe3Subscribe to an Ethereum notification type.sub: value, type: value, opts: value
connect!2Open a WebSocket connection. Raises on error.ws_url: value, opts: value
connect2Open a WebSocket connection to an Ethereum node.ws_url: value, opts: value

Summary

Functions

Close the WebSocket connection and free resources.

Open a WebSocket connection to an Ethereum node.

Open a WebSocket connection. Raises on error.

Subscribe to an Ethereum notification type.

Subscribe to an Ethereum notification type. Raises on error.

Cancel a subscription by ID.

Cancel a subscription by ID. Raises on error.

Types

event()

@type event() ::
  {:new_heads, String.t(), head()}
  | {:pending_transactions, String.t(), String.t()}
  | {:logs, String.t(), log()}
  | {:parse_error, String.t(), term()}

handler()

@type handler() :: (event() -> any())

head()

@type head() :: %{
  number: non_neg_integer(),
  hash: String.t(),
  parent_hash: String.t(),
  timestamp: non_neg_integer(),
  miner: String.t(),
  gas_limit: non_neg_integer(),
  gas_used: non_neg_integer(),
  base_fee_per_gas: non_neg_integer() | nil,
  logs_bloom: String.t(),
  transactions_root: String.t(),
  state_root: String.t(),
  receipts_root: String.t()
}

log()

@type log() :: %{
  address: String.t(),
  topics: [String.t()],
  data: String.t(),
  block_number: non_neg_integer(),
  transaction_hash: String.t(),
  log_index: non_neg_integer(),
  transaction_index: non_neg_integer(),
  removed: boolean()
}

subscription_type()

@type subscription_type() :: :new_heads | :pending_transactions | {:logs, map()}

t()

@type t() :: %Onchain.Subscription{
  agent: pid(),
  client: ZenWebsocket.Client.t(),
  handler: handler()
}

Functions

close(subscription)

@spec close(t()) :: :ok

Close the WebSocket connection and free resources.

Parameters

  • sub - Subscription handle (value)

Returns

Always returns :ok (:ok)

# descripex:contract
%{
  params: %{sub: %{description: "Subscription handle", kind: :value}},
  returns: %{type: ":ok", description: "Always returns :ok"}
}

connect(ws_url, opts \\ [])

@spec connect(
  String.t(),
  keyword()
) :: {:ok, t()} | {:error, term()}

Open a WebSocket connection to an Ethereum node.

Parameters

  • ws_url - WebSocket URL (wss:// or ws://) (value)
  • opts - Options: :handler (event callback fn), plus zen_websocket options (:retry_count, :retry_delay, :max_backoff) (default: [], value)

Returns

Subscription handle for subscribe/unsubscribe/close calls ({:ok, %Onchain.Subscription{}} | {:error, term()})

# descripex:contract
%{
  params: %{
    opts: %{
      default: [],
      description: "Options: :handler (event callback fn), plus zen_websocket options (:retry_count, :retry_delay, :max_backoff)",
      kind: :value
    },
    ws_url: %{description: "WebSocket URL (wss:// or ws://)", kind: :value}
  },
  returns: %{
    type: "{:ok, %Onchain.Subscription{}} | {:error, term()}",
    description: "Subscription handle for subscribe/unsubscribe/close calls"
  }
}

connect!(ws_url, opts \\ [])

@spec connect!(
  String.t(),
  keyword()
) :: t()

Open a WebSocket connection. Raises on error.

Parameters

  • ws_url - WebSocket URL (wss:// or ws://) (value)
  • opts - Same options as connect/2 (default: [], value)

Returns

Subscription handle (%Onchain.Subscription{})

# descripex:contract
%{
  params: %{
    opts: %{default: [], description: "Same options as connect/2", kind: :value},
    ws_url: %{description: "WebSocket URL (wss:// or ws://)", kind: :value}
  },
  returns: %{
    type: "%Onchain.Subscription{}",
    description: "Subscription handle"
  }
}

subscribe(sub, type, opts \\ [])

@spec subscribe(t(), subscription_type(), keyword()) ::
  {:ok, String.t()} | {:error, term()}

Subscribe to an Ethereum notification type.

Parameters

  • sub - Subscription handle from connect/2 (value)
  • type - Subscription type: :new_heads, :pending_transactions, or {:logs, filter_map} (value)
  • opts - Reserved for future options (default: [], value)

Returns

Subscription ID for unsubscribe ({:ok, subscription_id} | {:error, term()})

# descripex:contract
%{
  params: %{
    type: %{
      description: "Subscription type: :new_heads, :pending_transactions, or {:logs, filter_map}",
      kind: :value
    },
    opts: %{
      default: [],
      description: "Reserved for future options",
      kind: :value
    },
    sub: %{description: "Subscription handle from connect/2", kind: :value}
  },
  returns: %{
    type: "{:ok, subscription_id} | {:error, term()}",
    description: "Subscription ID for unsubscribe"
  }
}

subscribe!(sub, type, opts \\ [])

@spec subscribe!(t(), subscription_type(), keyword()) :: String.t()

Subscribe to an Ethereum notification type. Raises on error.

Parameters

  • sub - Subscription handle from connect/2 (value)
  • type - Same types as subscribe/3 (value)
  • opts - Reserved for future options (default: [], value)

Returns

Subscription ID (String.t())

# descripex:contract
%{
  params: %{
    type: %{description: "Same types as subscribe/3", kind: :value},
    opts: %{
      default: [],
      description: "Reserved for future options",
      kind: :value
    },
    sub: %{description: "Subscription handle from connect/2", kind: :value}
  },
  returns: %{type: "String.t()", description: "Subscription ID"}
}

unsubscribe(subscription, subscription_id)

@spec unsubscribe(t(), String.t()) :: {:ok, boolean()} | {:error, term()}

Cancel a subscription by ID.

Parameters

  • sub - Subscription handle (value)
  • subscription_id - Subscription ID from subscribe/3 (value)

Returns

true if unsubscribed successfully ({:ok, boolean()} | {:error, term()})

# descripex:contract
%{
  params: %{
    sub: %{description: "Subscription handle", kind: :value},
    subscription_id: %{
      description: "Subscription ID from subscribe/3",
      kind: :value
    }
  },
  returns: %{
    type: "{:ok, boolean()} | {:error, term()}",
    description: "true if unsubscribed successfully"
  }
}

unsubscribe!(sub, subscription_id)

@spec unsubscribe!(t(), String.t()) :: boolean()

Cancel a subscription by ID. Raises on error.

Parameters

  • sub - Subscription handle (value)
  • subscription_id - Subscription ID (value)

Returns

true if unsubscribed (boolean())

# descripex:contract
%{
  params: %{
    sub: %{description: "Subscription handle", kind: :value},
    subscription_id: %{description: "Subscription ID", kind: :value}
  },
  returns: %{type: "boolean()", description: "true if unsubscribed"}
}