lightspeed/pipeline/connector

Connector and retry/dead-letter contracts for M32 orchestration parity.

Types

Full connector plan contract.

pub type ConnectorPlan {
  ConnectorPlan(
    name: String,
    source: SourceConnector,
    sink: SinkConnector,
    retry_policy: RetryPolicy,
    dead_letter_policy: DeadLetterPolicy,
    reprocess_window: option.Option(ReprocessWindow),
    max_batch_records: Int,
  )
}

Constructors

Dead-letter policy after retry exhaustion.

pub type DeadLetterPolicy {
  DeadLetterPolicy(
    enabled: Bool,
    destination: String,
    max_payload_bytes: Int,
  )
}

Constructors

  • DeadLetterPolicy(
      enabled: Bool,
      destination: String,
      max_payload_bytes: Int,
    )

Deterministic failure action.

pub type FailureAction {
  Retry(next_attempt: Int, at_ms: Int)
  DeadLetter(destination: String, reason: String)
  Reject(reason: String)
}

Constructors

  • Retry(next_attempt: Int, at_ms: Int)
  • DeadLetter(destination: String, reason: String)
  • Reject(reason: String)

Optional reprocess-window contract.

pub type ReprocessWindow {
  ReprocessWindow(
    start_sequence: Int,
    end_sequence: Int,
    reason: String,
  )
}

Constructors

  • ReprocessWindow(
      start_sequence: Int,
      end_sequence: Int,
      reason: String,
    )

Retry policy for connector failures.

pub type RetryPolicy {
  RetryPolicy(
    max_attempts: Int,
    base_backoff_ms: Int,
    max_backoff_ms: Int,
  )
}

Constructors

  • RetryPolicy(
      max_attempts: Int,
      base_backoff_ms: Int,
      max_backoff_ms: Int,
    )

Sink-side connector abstraction.

pub type SinkConnector {
  DatabaseSink(name: String, table: String, upsert_key: String)
  PubSubSink(name: String, topic: String, ordering_key: String)
}

Constructors

  • DatabaseSink(name: String, table: String, upsert_key: String)
  • PubSubSink(name: String, topic: String, ordering_key: String)

Source-side connector abstraction.

pub type SourceConnector {
  DatabaseSource(name: String, query: String, batch_size: Int)
  FileSource(
    name: String,
    path: String,
    format: String,
    batch_size: Int,
  )
  QueueSource(name: String, topic: String, prefetch: Int)
}

Constructors

  • DatabaseSource(name: String, query: String, batch_size: Int)
  • FileSource(
      name: String,
      path: String,
      format: String,
      batch_size: Int,
    )
  • QueueSource(name: String, topic: String, prefetch: Int)

Values

pub fn classify_failure(
  plan: ConnectorPlan,
  attempt: Int,
  reason: String,
  now_ms: Int,
) -> FailureAction

Determine failure action from retry and dead-letter policy.

pub fn connector_plan(
  name: String,
  source: SourceConnector,
  sink: SinkConnector,
  retry_policy: RetryPolicy,
  dead_letter_policy: DeadLetterPolicy,
  reprocess_window: option.Option(ReprocessWindow),
  max_batch_records: Int,
) -> ConnectorPlan

Build a connector plan.

pub fn database_sink(
  name: String,
  table: String,
  upsert_key: String,
) -> SinkConnector

Build a database sink connector.

pub fn database_source(
  name: String,
  query: String,
  batch_size: Int,
) -> SourceConnector

Build a database source connector.

pub fn dead_letter_policy(
  plan: ConnectorPlan,
) -> DeadLetterPolicy

Access dead-letter policy.

pub fn dead_letter_policy_label(
  policy: DeadLetterPolicy,
) -> String

Dead-letter policy label.

pub fn default_plan() -> ConnectorPlan

Default M32 connector plan.

pub fn failure_action_label(action: FailureAction) -> String

Failure action label.

pub fn file_source(
  name: String,
  path: String,
  format: String,
  batch_size: Int,
) -> SourceConnector

Build a file source connector.

pub fn max_batch_records(plan: ConnectorPlan) -> Int

Access max batch records.

pub fn pubsub_sink(
  name: String,
  topic: String,
  ordering_key: String,
) -> SinkConnector

Build a pubsub sink connector.

pub fn queue_source(
  name: String,
  topic: String,
  prefetch: Int,
) -> SourceConnector

Build a queue source connector.

pub fn retry_policy(plan: ConnectorPlan) -> RetryPolicy

Access retry policy.

pub fn retry_policy_label(policy: RetryPolicy) -> String

Retry policy label.

pub fn signature(plan: ConnectorPlan) -> String

Stable connector-plan signature.

pub fn sink(plan: ConnectorPlan) -> SinkConnector

Access sink connector.

pub fn sink_label(sink: SinkConnector) -> String

Sink connector label.

pub fn source(plan: ConnectorPlan) -> SourceConnector

Access source connector.

pub fn source_label(source: SourceConnector) -> String

Source connector label.

pub fn valid(plan: ConnectorPlan) -> Bool

Validate connector plan invariants.

pub fn within_reprocess_window(
  plan: ConnectorPlan,
  sequence: Int,
) -> Bool

Check whether one sequence is allowed by the optional reprocess window.

Search Document