ExAtlas.Fly.Logs.Streamer (ExAtlas v0.5.0)

Copy Markdown View Source

Per-app GenServer that polls the Fly log API and dispatches new entries.

Each Streamer:

  • owns a single app's log cursor,
  • polls every :poll_interval ms,
  • dispatches {:ex_atlas_fly_logs, app_name, entries} on topic "ex_atlas_fly_logs:#{app_name}" via ExAtlas.Fly.Dispatcher,
  • monitors its subscribers and stops once all have disconnected.

Uses ExAtlas.Fly.Logs.Client.fetch_logs_with_retry/2 for automatic 401 retry.

Options

  • :app_name (required)
  • :project_dir — optional; carried in state for introspection and future use (e.g. if a deploy-vs-logs correlation is added). Safe to omit.
  • :poll_interval — ms between polls, default 2000.
  • :retry_fetch_fn — injection point for tests.
  • :registry / :dynamic_sup — set by StreamerSupervisor.

Subscription lifecycle

Subscribers register with the dispatcher topic "ex_atlas_fly_logs:#{app}". When the Streamer terminates (all subscribers gone, or StreamerSupervisor.stop_streamer/1), the Streamer emits a final {:ex_atlas_fly_logs_stopped, app_name} message on the same topic. Subscribers that want to clean up should match on this and ExAtlas.Fly.unsubscribe_logs/1 themselves. (The Dispatcher itself is framework-agnostic — atlas cannot unsubscribe other processes from it.)

Summary

Functions

Returns a specification to start this module under a supervisor.

Subscribes the calling pid to log events for app_name, starting a Streamer if one isn't already running.

Register subscriber_pid as a subscriber of streamer_pid.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(opts)

subscribe(app_name, opts)

@spec subscribe(
  String.t(),
  keyword()
) :: :ok | {:error, :no_streamer}

Subscribes the calling pid to log events for app_name, starting a Streamer if one isn't already running.

Returns {:error, :no_streamer} when neither a running Streamer nor the means to start one (registry + dynamic_sup) is available — this was previously a silent :ok that delivered no messages. ExAtlas.Fly.subscribe_logs/3 always plumbs both, so the error branch is primarily a programmer-error guard for direct callers / test utilities.

subscribe(app_name, project_dir, opts)

@spec subscribe(String.t(), String.t() | nil, keyword()) ::
  :ok | {:error, :no_streamer}

subscribe_pid(streamer_pid, subscriber_pid)

Register subscriber_pid as a subscriber of streamer_pid.