Parrhesia.API.Sync (parrhesia v0.16.1)

Copy Markdown

Sync control-plane and bulk event copy API.

This module manages outbound relay sync definitions, exposes runtime status for each configured sync worker, and provides local bulk clone/fetch helpers over stored events.

The main control-plane entrypoint is put_server/2. Accepted server maps are normalized into a stable internal shape and persisted by the sync manager. The expected input shape is:

%{
  "id" => "tribes-primary",
  "url" => "wss://relay-a.example/relay",
  "enabled?" => true,
  "auth_pubkey" => "...64 hex chars...",
  "filters" => [%{"kinds" => [5000]}],
  "mode" => "req_stream",
  "overlap_window_seconds" => 300,
  "relay_info_mode" => "required",
  "auth" => %{"type" => "nip42", "mode" => "on_challenge"},
  "tls" => %{
    "mode" => "required",
    "hostname" => "relay-a.example",
    "ca_certfile" => "/etc/tribes/cluster-ca.pem",
    "client_certfile" => "/etc/tribes/node.crt",
    "client_keyfile" => "/etc/tribes/node.key",
    "pins" => [%{"type" => "spki_sha256", "value" => "..."}]
  },
  "metadata" => %{}
}

Control-plane functions accept :manager or :name in opts to target a non-default manager. Bulk helper functions use Parrhesia.API.RequestContext and the same read/write policy checks as Parrhesia.API.Events.

Summary

Types

Summary returned by copy_events/3.

A local event page suitable for clone/fetch loops.

Normalized sync server configuration returned by the sync manager.

Functions

Copies events from a local source page stream into a target.

Fetches a single normalized sync server definition.

Lists all configured sync servers, including their runtime state.

Pages local stored events with the same policy checks as relay reads.

Creates or replaces a sync server definition.

Removes a stored sync server definition and stops its worker if it is running.

Returns runtime counters and timestamps for a single sync server.

Marks a sync server as running and reconciles its worker state.

Stops a sync server and records a disconnect timestamp in runtime state.

Returns a lazy stream over page_events/2 using ascending keyset pagination.

Returns a health summary for the sync subsystem.

Triggers an immediate sync run for a server.

Returns aggregate counters across all configured sync servers.

Types

copy_result()

@type copy_result() :: %{
  pages: non_neg_integer(),
  events: non_neg_integer(),
  accepted: non_neg_integer(),
  duplicates: non_neg_integer(),
  rejected: non_neg_integer(),
  last_cursor: {non_neg_integer(), String.t()} | nil
}

Summary returned by copy_events/3.

event_page()

@type event_page() :: %{
  events: [map()],
  next_cursor: {non_neg_integer(), String.t()} | nil,
  complete?: boolean(),
  raw_count: non_neg_integer()
}

A local event page suitable for clone/fetch loops.

server()

@type server() :: map()

Normalized sync server configuration returned by the sync manager.

Functions

copy_events(source, target, opts \\ [])

@spec copy_events(term(), term(), keyword()) ::
  {:ok, copy_result()} | {:error, term()}

Copies events from a local source page stream into a target.

Supported sources:

  • {:local, filter} - page local storage using page_events/2
  • filter - shorthand for {:local, filter}
  • :local - use opts[:filter]

Supported targets:

  • :local - import each page through import_events/2
  • a one-arity function - called once per non-empty page
  • {:fun, function} - same as a one-arity function

Options are the same as page_events/2. :progress may be a two-arity callback receiving events such as :page, :imported, and :completed.

get_server(server_id, opts \\ [])

@spec get_server(
  String.t(),
  keyword()
) :: {:ok, server()} | :error | {:error, term()}

Fetches a single normalized sync server definition.

Returns :error when the server id is unknown.

import_events(events, opts \\ [])

@spec import_events(
  [map()],
  keyword()
) :: {:ok, map()} | {:error, term()}

Imports a batch of events through Parrhesia.API.Events.publish_many/2.

The caller controls validation/authentication through the supplied request context. Sync callers normally pass %RequestContext{caller: :sync} for already-validated remote relay pages.

list_servers(opts \\ [])

@spec list_servers(keyword()) :: {:ok, [server()]} | {:error, term()}

Lists all configured sync servers, including their runtime state.

page_events(filter, opts \\ [])

@spec page_events(
  map(),
  keyword()
) :: {:ok, event_page()} | {:error, term()}

Pages local stored events with the same policy checks as relay reads.

This is the in-process counterpart to the SYNC-PAGE wire frame. It is intended for application-level clone/fetch flows that first narrow access with their own policy layer and then need a stable keyset cursor over matching Nostr events.

Required options:

Supported options:

  • :limit or :page_size - positive page size, defaults to 500
  • :order - :asc (default) or :desc
  • :after_key / :until_key - {created_at, event_id_hex} keyset bounds
  • :authorize_event - optional callback for app-level per-event filtering

put_server(server, opts \\ [])

@spec put_server(
  map(),
  keyword()
) :: {:ok, server()} | {:error, term()}

Creates or replaces a sync server definition.

remove_server(server_id, opts \\ [])

@spec remove_server(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Removes a stored sync server definition and stops its worker if it is running.

server_stats(server_id, opts \\ [])

@spec server_stats(
  String.t(),
  keyword()
) :: {:ok, map()} | :error | {:error, term()}

Returns runtime counters and timestamps for a single sync server.

Returns :error when the server id is unknown.

start_server(server_id, opts \\ [])

@spec start_server(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Marks a sync server as running and reconciles its worker state.

stop_server(server_id, opts \\ [])

@spec stop_server(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Stops a sync server and records a disconnect timestamp in runtime state.

stream_events(filter, opts \\ [])

@spec stream_events(
  map(),
  keyword()
) :: {:ok, Enumerable.t()} | {:error, term()}

Returns a lazy stream over page_events/2 using ascending keyset pagination.

Stream pages are fetched only as the enumerable is consumed. If a later page fails, the stream exits with {:parrhesia_sync_page_error, reason}.

sync_health(opts \\ [])

@spec sync_health(keyword()) :: {:ok, map()} | {:error, term()}

Returns a health summary for the sync subsystem.

sync_now(server_id, opts \\ [])

@spec sync_now(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Triggers an immediate sync run for a server.

sync_stats(opts \\ [])

@spec sync_stats(keyword()) :: {:ok, map()} | {:error, term()}

Returns aggregate counters across all configured sync servers.