lightspeed/pipeline

Integrated data-pipeline core contracts for M31.

Types

Stage boundary contracts.

pub type Boundary {
  Boundary(input_contract: String, output_contract: String)
}

Constructors

  • Boundary(input_contract: String, output_contract: String)

Pipeline lifecycle.

pub type Lifecycle {
  Idle
  Scheduled(at_ms: Int)
  Triggered(reason: String, at_ms: Int)
  Running(run_id: String, started_ms: Int)
  Failed(run_id: String, reason: String)
  Crashed(run_id: String, reason: String)
  Replaying(run_id: String, from_stage: String, started_ms: Int)
  Completed(run_id: String, finished_ms: Int)
}

Constructors

  • Idle
  • Scheduled(at_ms: Int)
  • Triggered(reason: String, at_ms: Int)
  • Running(run_id: String, started_ms: Int)
  • Failed(run_id: String, reason: String)
  • Crashed(run_id: String, reason: String)
  • Replaying(run_id: String, from_stage: String, started_ms: Int)
  • Completed(run_id: String, finished_ms: Int)

Pipeline contract.

pub type Pipeline {
  Pipeline(name: String, schedule: Schedule, stages: List(Stage))
}

Constructors

  • Pipeline(name: String, schedule: Schedule, stages: List(Stage))

Stage processing result.

pub type ProcessResult {
  Applied(checkpoint: checkpoint.Checkpoint)
  DuplicateSuppressed(stage: String, idempotency_key: String)
  Rejected(reason: String)
}

Constructors

  • Applied(checkpoint: checkpoint.Checkpoint)
  • DuplicateSuppressed(stage: String, idempotency_key: String)
  • Rejected(reason: String)

Runtime

opaque

Pipeline runtime.

pub opaque type Runtime

Pipeline schedule profile.

pub type Schedule {
  Manual
  Interval(interval_ms: Int)
  Cron(expression: String)
}

Constructors

  • Manual
  • Interval(interval_ms: Int)
  • Cron(expression: String)

One pipeline stage.

pub type Stage {
  Stage(name: String, kind: StageKind, boundary: Boundary)
}

Constructors

Stage category.

pub type StageKind {
  Source
  Transform
  Sink
}

Constructors

  • Source
  • Transform
  • Sink

Values

pub fn checkpoints(
  runtime: Runtime,
) -> List(checkpoint.Checkpoint)

Stable checkpoints in emit order.

pub fn complete(runtime: Runtime, now_ms: Int) -> Runtime

Complete the active run.

pub fn crash(runtime: Runtime, reason: String) -> Runtime

Mark active run as crashed.

pub fn fail(runtime: Runtime, reason: String) -> Runtime

Mark active run as failed.

pub fn lifecycle(runtime: Runtime) -> Lifecycle

Runtime lifecycle accessor.

pub fn lifecycle_label(lifecycle: Lifecycle) -> String

Lifecycle label.

pub fn new(pipeline: Pipeline) -> Runtime

Build one runtime.

pub fn pipeline(
  name: String,
  schedule: Schedule,
  stages: List(Stage),
) -> Pipeline

Build one pipeline contract.

pub fn pipeline_signature(pipeline: Pipeline) -> String

Stable pipeline signature.

pub fn process(
  runtime: Runtime,
  stage_name: String,
  processed_records: Int,
  lag_ms: Int,
  idempotency_key: String,
  now_ms: Int,
) -> #(Runtime, ProcessResult)

Process one stage event.

pub fn process_result_label(result: ProcessResult) -> String

Stable process-result label.

pub fn record_dead_letter(
  runtime: Runtime,
  count: Int,
) -> Runtime

Record dead-letter increments for the active run.

pub fn record_retry(runtime: Runtime, count: Int) -> Runtime

Record retries for the active run.

pub fn resume_from_latest_checkpoint(
  runtime: Runtime,
  now_ms: Int,
) -> Result(Runtime, String)

Resume from the latest checkpoint.

pub fn runtime_pipeline(runtime: Runtime) -> Pipeline

Runtime pipeline accessor.

pub fn runtime_telemetry(runtime: Runtime) -> telemetry.Snapshot

Runtime telemetry accessor.

pub fn schedule(runtime: Runtime, at_ms: Int) -> Runtime

Schedule one runtime.

pub fn schedule_label(schedule: Schedule) -> String

Schedule label.

pub fn signature(runtime: Runtime) -> String

Stable runtime signature.

pub fn sink_idempotency_keys(runtime: Runtime) -> List(String)

Sink idempotency keys in application order.

pub fn sink_stage(name: String, input_contract: String) -> Stage

Build a sink stage.

pub fn source_stage(
  name: String,
  output_contract: String,
) -> Stage

Build a source stage.

pub fn stage_kind_label(kind: StageKind) -> String

Stage kind label.

pub fn start(runtime: Runtime, now_ms: Int) -> Runtime

Start one run.

pub fn transform_stage(
  name: String,
  input_contract: String,
  output_contract: String,
) -> Stage

Build a transform stage.

pub fn trigger(
  runtime: Runtime,
  reason: String,
  at_ms: Int,
) -> Runtime

Trigger one runtime.

pub fn valid(runtime: Runtime) -> Bool

Validate pipeline and runtime invariants.

pub fn valid_pipeline(pipeline: Pipeline) -> Bool

Validate one pipeline contract.

Search Document