lightspeed/pipeline/orchestrator
Connector-aware pipeline orchestration runtime for M32.
Types
One queued pipeline batch.
pub type Batch {
Batch(
key: String,
stage: String,
records: Int,
lag_ms: Int,
idempotency_key: String,
sequence: Int,
)
}
Constructors
-
Batch( key: String, stage: String, records: Int, lag_ms: Int, idempotency_key: String, sequence: Int, )
One dead-letter record.
pub type DeadLetterRecord {
DeadLetterRecord(
batch_key: String,
destination: String,
reason: String,
at_ms: Int,
)
}
Constructors
-
DeadLetterRecord( batch_key: String, destination: String, reason: String, at_ms: Int, )
One orchestration operation outcome.
pub type ProcessOutcome {
Processed(result: pipeline.ProcessResult)
RetryScheduled(record: RetryRecord)
DeadLettered(record: DeadLetterRecord)
FailureRejected(reason: String)
}
Constructors
-
Processed(result: pipeline.ProcessResult) -
RetryScheduled(record: RetryRecord) -
DeadLettered(record: DeadLetterRecord) -
FailureRejected(reason: String)
One retry record.
pub type RetryRecord {
RetryRecord(
batch_key: String,
next_attempt: Int,
reason: String,
retry_at_ms: Int,
)
}
Constructors
-
RetryRecord( batch_key: String, next_attempt: Int, reason: String, retry_at_ms: Int, )
Runtime
opaqueConnector-aware orchestration runtime.
pub opaque type Runtime
Values
pub fn ack_batch(
runtime: Runtime,
key: String,
now_ms: Int,
) -> #(Runtime, Result(ProcessOutcome, String))
Acknowledge one batch after work completion.
pub fn apply_action(
runtime: Runtime,
action: operator.Action,
now_ms: Int,
) -> #(Runtime, Result(Nil, String))
Apply one operator control action.
pub fn dead_letters(runtime: Runtime) -> List(DeadLetterRecord)
Dead-letter records in emit order.
pub fn enqueue_batch(
runtime: Runtime,
key: String,
stage: String,
records: Int,
lag_ms: Int,
idempotency_key: String,
sequence: Int,
) -> #(Runtime, Result(Nil, String))
Enqueue one batch with connector, operator, and backpressure checks.
pub fn fail_batch(
runtime: Runtime,
key: String,
attempt: Int,
reason: String,
now_ms: Int,
) -> #(Runtime, Result(ProcessOutcome, String))
Handle one failed batch according to retry/dead-letter policy.
pub fn new(
pipeline_runtime: pipeline.Runtime,
connector_plan: connector.ConnectorPlan,
boundary: backpressure.Boundary,
) -> Runtime
Build one orchestration runtime.
pub fn process_outcome_label(outcome: ProcessOutcome) -> String
Process-outcome label.
pub fn runtime_connector_plan(
runtime: Runtime,
) -> connector.ConnectorPlan
Access connector plan.
pub fn runtime_operator(runtime: Runtime) -> operator.Runtime
Access operator runtime.
pub fn runtime_pipeline(runtime: Runtime) -> pipeline.Runtime
Access inner pipeline runtime.