ExAtlas.Fly.Dispatcher (ExAtlas v0.5.0)

Copy Markdown View Source

Framework-agnostic broadcast for Fly log and deploy events.

ExAtlas cannot hard-depend on Phoenix.PubSub because many consumers do not use Phoenix. This dispatcher picks one of three backends based on application config (config :ex_atlas, :fly, dispatcher: ...):

  • :registry (default) — atlas starts its own Registry with duplicate keys. Subscribers register the caller pid; dispatch/2 sends the message via send(pid, message).

  • :phoenix_pubsub — requires phoenix_pubsub to be present in the host app's deps and config :ex_atlas, :fly, pubsub: MyApp.PubSub. Uses Phoenix.PubSub.subscribe/2 + broadcast/3.

  • {:mfa, {mod, fun, extra_args}} — on dispatch, calls apply(mod, fun, [topic, message | extra_args]). Subscription is a no-op (the host owns delivery).

Topics & messages

  • Logs: topic "ex_atlas_fly_logs:#{app}", message {:ex_atlas_fly_logs, app, entries}
  • Deploy: topic "ex_atlas_fly_deploy:#{ticket_id}", message {:ex_atlas_fly_deploy, ticket_id, line}

These shapes are stable — hosts match on them in handle_info/2.

Dispatch semantics (H7)

In :registry mode, dispatch/2 uses Registry.dispatch/3 which runs the fan-out send/2 loop inside the caller's process while holding the registry partition lock. send/2 itself is async (no back-pressure from the receiver), so dispatch time scales linearly with the number of subscribers. For typical log-streaming / deploy workloads this is negligible (a handful of subscribers per topic). Hosts with large fan-out should use :phoenix_pubsub mode.

Subscribers with full mailboxes are not removed automatically — use subscribe_with_backpressure/2 if you need auto-eviction of slow consumers (see E6).

Summary

Functions

Child spec for the dispatcher's own registry (only used in :registry mode).

Dispatch message to all subscribers of topic.

Whether this dispatcher needs its own supervised child (the atlas Registry).

Subscribe the calling pid to topic.

Subscribe the calling pid to topic with a background watchdog that auto-unsubscribes the caller if their message_queue_len/1 stays above :threshold for :cooldown_ms of consecutive polls.

Unsubscribe the calling pid from topic.

Types

mode()

@type mode() :: :registry | :phoenix_pubsub | {:mfa, {module(), atom(), list()}}

Functions

child_spec(opts)

Child spec for the dispatcher's own registry (only used in :registry mode).

dispatch(topic, message)

@spec dispatch(String.t(), term()) :: :ok

Dispatch message to all subscribers of topic.

needs_registry?()

@spec needs_registry?() :: boolean()

Whether this dispatcher needs its own supervised child (the atlas Registry).

Returns true only in :registry mode.

subscribe(topic)

@spec subscribe(String.t()) :: :ok | {:error, term()}

Subscribe the calling pid to topic.

In :registry mode this registers the pid with the atlas registry. In :phoenix_pubsub mode this calls Phoenix.PubSub.subscribe/2. In :mfa mode this is a no-op — the host is expected to handle routing.

subscribe_with_backpressure(topic, opts \\ [])

@spec subscribe_with_backpressure(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Subscribe the calling pid to topic with a background watchdog that auto-unsubscribes the caller if their message_queue_len/1 stays above :threshold for :cooldown_ms of consecutive polls.

Protects the Streamer / Deploy producer from a slow consumer that would otherwise bloat its mailbox and eventually OOM the VM. Typical consumers (fast LiveView renderers) never trigger this; it's insurance for a hung subscriber.

Options

  • :threshold — message queue length that triggers eviction. Default 10000.
  • :poll_ms — how often to check the subscriber's queue length. Default 1000.

Caveats

Only works in :registry mode. In :phoenix_pubsub or :mfa mode atlas is not the subscription authority and cannot evict subscribers — subscribe_with_backpressure/2 falls back to a plain subscribe/1 and logs a warning.

The watchdog process is unlinked and monitors the subscriber pid — if the subscriber dies, the watchdog exits cleanly. Only :registry-mode subscribers get the protection.

unsubscribe(topic)

@spec unsubscribe(String.t()) :: :ok

Unsubscribe the calling pid from topic.