lightspeed/pipeline/orchestrator

Connector-aware pipeline orchestration runtime for M32.

Types

One queued pipeline batch.

pub type Batch {
  Batch(
    key: String,
    stage: String,
    records: Int,
    lag_ms: Int,
    idempotency_key: String,
    sequence: Int,
  )
}

Constructors

  • Batch(
      key: String,
      stage: String,
      records: Int,
      lag_ms: Int,
      idempotency_key: String,
      sequence: Int,
    )

One dead-letter record.

pub type DeadLetterRecord {
  DeadLetterRecord(
    batch_key: String,
    destination: String,
    reason: String,
    at_ms: Int,
  )
}

Constructors

  • DeadLetterRecord(
      batch_key: String,
      destination: String,
      reason: String,
      at_ms: Int,
    )

One orchestration operation outcome.

pub type ProcessOutcome {
  Processed(result: pipeline.ProcessResult)
  RetryScheduled(record: RetryRecord)
  DeadLettered(record: DeadLetterRecord)
  FailureRejected(reason: String)
}

Constructors

One retry record.

pub type RetryRecord {
  RetryRecord(
    batch_key: String,
    next_attempt: Int,
    reason: String,
    retry_at_ms: Int,
  )
}

Constructors

  • RetryRecord(
      batch_key: String,
      next_attempt: Int,
      reason: String,
      retry_at_ms: Int,
    )

Runtime

opaque

Connector-aware orchestration runtime.

pub opaque type Runtime

Values

pub fn ack_batch(
  runtime: Runtime,
  key: String,
  now_ms: Int,
) -> #(Runtime, Result(ProcessOutcome, String))

Acknowledge one batch after work completion.

pub fn apply_action(
  runtime: Runtime,
  action: operator.Action,
  now_ms: Int,
) -> #(Runtime, Result(Nil, String))

Apply one operator control action.

pub fn batches(runtime: Runtime) -> List(Batch)

Batches in enqueue order.

pub fn complete_run(runtime: Runtime, now_ms: Int) -> Runtime

Complete one pipeline run.

pub fn dead_letters(runtime: Runtime) -> List(DeadLetterRecord)

Dead-letter records in emit order.

pub fn enqueue_batch(
  runtime: Runtime,
  key: String,
  stage: String,
  records: Int,
  lag_ms: Int,
  idempotency_key: String,
  sequence: Int,
) -> #(Runtime, Result(Nil, String))

Enqueue one batch with connector, operator, and backpressure checks.

pub fn fail_batch(
  runtime: Runtime,
  key: String,
  attempt: Int,
  reason: String,
  now_ms: Int,
) -> #(Runtime, Result(ProcessOutcome, String))

Handle one failed batch according to retry/dead-letter policy.

pub fn in_flight_count(runtime: Runtime) -> Int

Current in-flight count.

pub fn new(
  pipeline_runtime: pipeline.Runtime,
  connector_plan: connector.ConnectorPlan,
  boundary: backpressure.Boundary,
) -> Runtime

Build one orchestration runtime.

pub fn process_outcome_label(outcome: ProcessOutcome) -> String

Process-outcome label.

pub fn queued_count(runtime: Runtime) -> Int

Current queued count.

pub fn retries(runtime: Runtime) -> List(RetryRecord)

Retry records in emit order.

pub fn runtime_connector_plan(
  runtime: Runtime,
) -> connector.ConnectorPlan

Access connector plan.

pub fn runtime_operator(runtime: Runtime) -> operator.Runtime

Access operator runtime.

pub fn runtime_pipeline(runtime: Runtime) -> pipeline.Runtime

Access inner pipeline runtime.

pub fn signature(runtime: Runtime) -> String

Stable runtime signature.

pub fn start_available(
  runtime: Runtime,
  now_ms: Int,
) -> #(Runtime, List(String))

Start queued work up to backpressure in-flight limit.

pub fn start_run(runtime: Runtime, now_ms: Int) -> Runtime

Start one pipeline run.

pub fn valid(runtime: Runtime) -> Bool

Validate runtime invariants.

Search Document