lightspeed/agent/session

Minimal typed session actor model for Phase 2.

Types

Typed inbox events handled by one live-session actor.

pub type InboxEvent {
  Connect(route: String, csrf_token: String, now_ms: Int)
  Reconnect(route: String, now_ms: Int)
  Increment
  Decrement
  Ack(ref: String)
  Heartbeat(now_ms: Int)
  Tick(now_ms: Int)
  Crash(reason: String)
  Restart(now_ms: Int)
  Shutdown(reason: String)
}

Constructors

  • Connect(route: String, csrf_token: String, now_ms: Int)
  • Reconnect(route: String, now_ms: Int)
  • Increment
  • Decrement
  • Ack(ref: String)
  • Heartbeat(now_ms: Int)
  • Tick(now_ms: Int)
  • Crash(reason: String)
  • Restart(now_ms: Int)
  • Shutdown(reason: String)

Typed inbox envelope that identifies the sender process id.

pub type InboxMessage {
  InboxMessage(owner: String, event: InboxEvent)
}

Constructors

Typed outbox records emitted from the actor.

pub type OutboxMessage {
  OutboxPatch(PatchEnvelope)
  OutboxTelemetry(TelemetryEvent)
}

Constructors

Patch queued by the session actor.

pub type PatchEnvelope {
  PatchEnvelope(ref: String, patch: diff.Patch)
}

Constructors

Reconnect strategy after disconnect/crash.

pub type ReconnectPolicy {
  Rehydrate
  Remount
}

Constructors

  • Rehydrate
  • Remount

Session

opaque

Internal state of one session actor.

pub opaque type Session

Telemetry emitted by one session actor.

pub type TelemetryEvent {
  SessionMounted(session_id: String, route: String)
  SessionRehydrated(session_id: String, route: String)
  SessionRemounted(session_id: String, route: String)
  CounterUpdated(session_id: String, value: Int)
  PatchQueued(session_id: String, ref: String)
  PatchAcked(session_id: String, ref: String)
  HeartbeatReceived(session_id: String, deadline_ms: Int)
  HeartbeatTimedOut(session_id: String, now_ms: Int)
  SessionCrashed(session_id: String, reason: String)
  SessionRestarted(session_id: String)
  SessionShutdown(session_id: String, reason: String)
  OwnershipRejected(session_id: String, owner: String)
  EventIgnored(session_id: String, event: String)
}

Constructors

  • SessionMounted(session_id: String, route: String)
  • SessionRehydrated(session_id: String, route: String)
  • SessionRemounted(session_id: String, route: String)
  • CounterUpdated(session_id: String, value: Int)
  • PatchQueued(session_id: String, ref: String)
  • PatchAcked(session_id: String, ref: String)
  • HeartbeatReceived(session_id: String, deadline_ms: Int)
  • HeartbeatTimedOut(session_id: String, now_ms: Int)
  • SessionCrashed(session_id: String, reason: String)
  • SessionRestarted(session_id: String)
  • SessionShutdown(session_id: String, reason: String)
  • OwnershipRejected(session_id: String, owner: String)
  • EventIgnored(session_id: String, event: String)

Aggregated telemetry counters for hot-path consumers.

pub type TelemetrySummary {
  TelemetrySummary(
    patch_queued: Int,
    patch_acked: Int,
    session_crashed: Int,
    session_rehydrated: Int,
    session_remounted: Int,
    total_events: Int,
  )
}

Constructors

  • TelemetrySummary(
      patch_queued: Int,
      patch_acked: Int,
      session_crashed: Int,
      session_rehydrated: Int,
      session_remounted: Int,
      total_events: Int,
    )

Values

pub fn counter(session: Session) -> Int

Current counter model used by this minimal actor.

pub fn crashed(session: Session) -> Bool

True when session has crashed and is waiting for restart.

pub fn flush_outbox(
  session: Session,
) -> #(Session, List(OutboxMessage))

Drain the outbox and clear it from state.

pub fn handle(session: Session, message: InboxMessage) -> Session

Handle one typed inbox message.

pub fn heartbeat_deadline_ms(session: Session) -> Int

Heartbeat deadline timestamp in milliseconds.

pub fn id(session: Session) -> String

Session id.

pub fn lifecycle(session: Session) -> typestate.Lifecycle

Lifecycle label.

pub fn oldest_pending_patch(
  session: Session,
) -> option.Option(PatchEnvelope)

Oldest pending patch waiting for ack, when present.

pub fn owner(session: Session) -> String

Owning server process id.

pub fn patch(patch: PatchEnvelope) -> diff.Patch

Extract patch payload.

pub fn patch_ref(patch: PatchEnvelope) -> String

Extract patch reference.

pub fn pending_patch_count(session: Session) -> Int

Number of pending patches waiting for ack.

pub fn pending_patches(session: Session) -> List(PatchEnvelope)

Pending patches waiting for ack, in emit order.

pub fn reconnect_policy(session: Session) -> ReconnectPolicy

Current reconnect policy.

pub fn start(
  id: String,
  owner: String,
  reconnect_policy: ReconnectPolicy,
  now_ms: Int,
  timeout_ms: Int,
) -> Session

Start a new disconnected session actor state.

pub fn summary_patch_acked(summary: TelemetrySummary) -> Int

patch_acked event count.

pub fn summary_patch_queued(summary: TelemetrySummary) -> Int

patch_queued event count.

pub fn summary_session_crashed(summary: TelemetrySummary) -> Int

session_crashed event count.

pub fn summary_session_rehydrated(
  summary: TelemetrySummary,
) -> Int

session_rehydrated event count.

pub fn summary_session_remounted(
  summary: TelemetrySummary,
) -> Int

session_remounted event count.

pub fn summary_total_events(summary: TelemetrySummary) -> Int

Total telemetry event count.

pub fn telemetry(session: Session) -> List(TelemetryEvent)

Telemetry records in emit order.

pub fn telemetry_count(session: Session) -> Int

Number of telemetry events currently held by session state.

pub fn telemetry_label(event: TelemetryEvent) -> String

Stable telemetry label for assertions and logs.

pub fn telemetry_summary(session: Session) -> TelemetrySummary

Aggregated telemetry counters for hot-path extraction.

Search Document