Zenohex.Session (Zenohex v0.9.1-beta.1)

Copy Markdown View Source

Interface for managing Zenoh sessions and related operations.

This module provides functions to open and close Zenoh sessions, publish and retrieve data, and declare publishers, subscribers, and queryables.

Internally, all operations are forwarded to the native layer via NIFs.

Typical usage starts with open/0 or open/1 to create a session, followed by operations such as put/4, get/4, or declare_publisher/3.

Examples

iex> {:ok, session_id} = Zenohex.Session.open()
iex> Zenohex.Session.put(session_id, "key/expr", "payload")
iex> Zenohex.Session.close(session_id)

This module serves as the main entry point for using Zenoh in Elixir.

Summary

Types

An identifier for a session.

A Timestamp is formatted to a String as such: "<RFC3339> /<hlc_id_hexadecimal>"

The global unique id of a Zenoh session.

Functions

Closes a session.

Declares a publisher associated with the given session and key_expr.

Declares a reusable querier associated with the given session and key_expr.

Declares a queryable for the specified key_expr.

Declares a subscriber for the specified key_expr.

Deletes data matching the given key_expr.

Query data with the given selector.

Get information about the zenoh Session.

New zenoh timestamp string associated with the given session.

Opens a session using Zenoh default config.

Opens a session with the given configuration.

Publishes a payload to the given key_expr within an open session.

Types

congestion_control()

@type congestion_control() :: :drop | :block | :block_first

delete_opts()

@type delete_opts() :: [
  attachment: binary() | nil,
  congestion_control: congestion_control(),
  express: boolean(),
  priority: priority(),
  timestamp: zenoh_timestamp_string()
]

get_opts()

@type get_opts() :: [
  accept_replies: reply_key_expr(),
  attachment: binary() | nil,
  allowed_destination: locality(),
  congestion_control: congestion_control(),
  consolidation: query_consolidation(),
  encoding: String.t(),
  express: boolean(),
  payload: binary() | nil,
  priority: priority(),
  target: query_target(),
  query_timeout: non_neg_integer()
]

id()

@type id() :: reference()

An identifier for a session.

locality()

@type locality() :: :session_local | :remote | :any

priority()

@type priority() ::
  :real_time
  | :interactive_high
  | :interactive_low
  | :data_high
  | :data
  | :data_low
  | :background

publisher_opts()

@type publisher_opts() :: [
  allowed_destination: locality(),
  congestion_control: congestion_control(),
  encoding: String.t(),
  express: boolean(),
  priority: priority()
]

put_opts()

@type put_opts() :: [
  attachment: binary() | nil,
  congestion_control: congestion_control(),
  encoding: String.t(),
  express: boolean(),
  priority: priority(),
  timestamp: zenoh_timestamp_string()
]

querier_opts()

@type querier_opts() :: [
  accept_replies: reply_key_expr(),
  allowed_destination: locality(),
  congestion_control: congestion_control(),
  consolidation: query_consolidation(),
  express: boolean(),
  priority: priority(),
  query_timeout: non_neg_integer(),
  target: query_target()
]

query_consolidation()

@type query_consolidation() :: :auto | :none | :monotonic | :latest

query_target()

@type query_target() :: :best_matching | :all | :all_complete

queryable_opts()

@type queryable_opts() :: [allowed_origin: locality(), complete: boolean()]

reply_key_expr()

@type reply_key_expr() :: :matching_query | :any

subscriber_opts()

@type subscriber_opts() :: [{:allowed_origin, locality()}]

zenoh_timestamp_string()

@type zenoh_timestamp_string() :: String.t()

A Timestamp is formatted to a String as such: "<RFC3339> /<hlc_id_hexadecimal>"

2025-07-16T01:34:56.871273403Z/208a2ec783ec4527a39cc1d5559c70e9

see. https://docs.rs/zenoh/latest/zenoh/time/struct.Timestamp.html

zid()

@type zid() :: String.t()

The global unique id of a Zenoh session.

see. https://docs.rs/zenoh/latest/zenoh/session/struct.ZenohId.html

Functions

close(session_id)

@spec close(session_id :: id()) :: :ok | {:error, reason :: term()}

Closes a session.

Releases all resources associated with the given session_id. After calling this function, the session_id must not be used again.

Parameters

  • session_id : The session identifier returned by open/1.

declare_publisher(session_id, key_expr, opts \\ [])

@spec declare_publisher(session_id :: id(), String.t(), publisher_opts()) ::
  {:ok, publisher_id :: Zenohex.Publisher.id()} | {:error, reason :: term()}

Declares a publisher associated with the given session and key_expr.

Parameters

  • session_id: Identifier of the session returned by open/0 or open/1.
  • key_expr: Key expression to publish under.
  • opts: Options for configuring the publisher.

Important

The returned publisher_id must be held for as long as the publisher is in use. If it is not held and gets garbage-collected by the BEAM, the underlying publisher in Rust will be automatically dropped.

declare_querier(session_id, key_expr, opts \\ [])

@spec declare_querier(session_id :: id(), String.t(), querier_opts()) ::
  {:ok, querier_id :: Zenohex.Querier.id()} | {:error, reason :: term()}

Declares a reusable querier associated with the given session and key_expr.

Unlike get/4, which performs a one-shot query, a querier keeps the declaration alive and can execute multiple Zenohex.Querier.get/3 calls with different per-request options.

Parameters

  • session_id: Identifier of the session returned by open/0 or open/1.
  • key_expr: Key expression to query under.
  • opts: Options for configuring the reusable querier.

Important

The returned querier_id must be held for as long as the querier is in use. If it is not held and gets garbage-collected by the BEAM, the underlying querier in Rust will be automatically dropped.

declare_queryable(session_id, key_expr, pid \\ self(), opts \\ [])

@spec declare_queryable(session_id :: id(), String.t(), pid(), queryable_opts()) ::
  {:ok, queryable_id :: Zenohex.Queryable.id()} | {:error, reason :: term()}

Declares a queryable for the specified key_expr.

Parameters

  • session_id: Identifier of the session returned by open/0 or open/1.
  • key_expr: Key expression that the queryable will handle.
  • pid: Process to receive query messages. Defaults to the calling process.
  • opts: Options for configuring the queryable.

Important

The returned queryable_id must be held for as long as the queryable is in use. If it is not held and gets garbage-collected by the BEAM, the underlying queryable in Rust will be automatically dropped.

declare_subscriber(session_id, key_expr, pid \\ self(), opts \\ [])

@spec declare_subscriber(session_id :: id(), String.t(), pid(), subscriber_opts()) ::
  {:ok, subscriber_id :: Zenohex.Subscriber.id()} | {:error, reason :: term()}

Declares a subscriber for the specified key_expr.

Parameters

  • session_id: Identifier of the session returned by open/0 or open/1.
  • key_expr: Key expression to subscribe to.
  • pid: Process to receive subscription messages. Defaults to the calling process.
  • opts: Options for configuring the subscriber.

Important

The returned subscriber_id must be held for as long as the subscriber is in use. If it is not held and gets garbage-collected by the BEAM, the underlying subscriber in Rust will be automatically dropped.

delete(session_id, key_expr, opts \\ [])

@spec delete(session_id :: id(), String.t(), delete_opts()) ::
  :ok | {:error, reason :: term()}

Deletes data matching the given key_expr.

Parameters

  • session_id : The session identifier returned by open/0 or open/1.
  • key_expr : The key expression to delete.
  • opts : Options for the delete operation.

Examples

iex> {:ok, session_id} = Zenohex.Session.open()
iex> Zenohex.Session.delete(session_id, "key/expr")
:ok

get(session_id, selector, timeout, opts \\ [])

@spec get(session_id :: id(), String.t(), non_neg_integer(), get_opts()) ::
  {:ok, [Zenohex.Sample.t() | Zenohex.Query.ReplyError.t()]}
  | {:error, :timeout}
  | {:error, reason :: term()}

Query data with the given selector.

Parameters

  • session_id : The session identifier returned by open/0 or open/1.
  • selector : The selector to query.
  • timeout : Timeout in milliseconds to wait for query replies.
  • opts : Options for the get operation.

Examples

iex> {:ok, session_id} = Zenohex.Session.open()
iex> Zenohex.Session.get(session_id, "key/expr")
{:ok, [%Zenohex.Sample{}]}

info(session_id)

@spec info(session_id :: id()) ::
  {:ok, Zenohex.Session.Info.t()} | {:error, reason :: term()}

Get information about the zenoh Session.

Parameters

  • session_id : The session identifier returned by open/0 or open/1.

Examples

iex> {:ok, session_id} = Zenohex.Session.open()
iex> {:ok, %Zenohex.Session.Info{}} = Zenohex.Session.info(session_id)

new_timestamp(session_id)

@spec new_timestamp(session_id :: id()) ::
  {:ok, zenoh_timestamp_string()} | {:error, reason :: term()}

New zenoh timestamp string associated with the given session.

Parameters

  • session_id : The session identifier returned by open/0 or open/1.

Examples

iex> {:ok, session_id} = Zenohex.Session.open()
iex> {:ok, zenoh_timestamp} = Zenohex.Session.new_timestamp(session_id)
iex> [timestamp, zenoh_id_string] = String.split(zenoh_timestamp, "/")
iex> {:ok, %DateTime{}, 0} = DateTime.from_iso8601(timestamp)

open()

@spec open() :: {:ok, session_id :: id()} | {:error, reason :: term()}

Opens a session using Zenoh default config.

Internally opens a session via open/1, using Zenohex.Config.default/0.

open(config)

@spec open(String.t() | map()) ::
  {:ok, session_id :: id()} | {:error, reason :: term()}

Opens a session with the given configuration.

You can pass either:

  • a JSON5 string (JSON is supported as a subset of JSON5), or
  • an Elixir map.

When a map is provided, it is first normalized and validated via Zenohex.ConfigMap.merge/2, then encoded to JSON and passed to the NIF.

Parameters

  • config : A JSON5 string or Elixir map representing the Zenoh configuration.

Important

The returned session_id must be held for as long as the session is in use. If it is not held and gets garbage-collected by the BEAM, the underlying session in Rust will be automatically dropped and closed.

Examples

iex> {:ok, session_id} =
...> File.read!("test/support/fixtures/DEFAULT_CONFIG.json5") |>
...> Zenohex.Session.open()

iex> {:ok, session_id} = Zenohex.ConfigMap.default() |> Zenohex.Session.open()

put(session_id, key_expr, payload, opts \\ [])

@spec put(session_id :: id(), String.t(), binary(), put_opts()) ::
  :ok | {:error, reason :: term()}

Publishes a payload to the given key_expr within an open session.

This function sends a value (as a binary) to the specified key expression.

Parameters

  • session_id : The session identifier returned by open/0 or open/1.
  • key_expr : The key expression to publish to.
  • payload : The value to publish, as a binary.
  • opts : Options for the publish operation.

Examples

iex> {:ok, session_id} = Zenohex.Session.open()
iex> Zenohex.Session.put(session_id, "key/expr", "payload")
:ok