lightspeed/pipeline/connector
Connector and retry/dead-letter contracts for M32 orchestration parity.
Types
Full connector plan contract.
pub type ConnectorPlan {
ConnectorPlan(
name: String,
source: SourceConnector,
sink: SinkConnector,
retry_policy: RetryPolicy,
dead_letter_policy: DeadLetterPolicy,
reprocess_window: option.Option(ReprocessWindow),
max_batch_records: Int,
)
}
Constructors
-
ConnectorPlan( name: String, source: SourceConnector, sink: SinkConnector, retry_policy: RetryPolicy, dead_letter_policy: DeadLetterPolicy, reprocess_window: option.Option(ReprocessWindow), max_batch_records: Int, )
Dead-letter policy after retry exhaustion.
pub type DeadLetterPolicy {
DeadLetterPolicy(
enabled: Bool,
destination: String,
max_payload_bytes: Int,
)
}
Constructors
-
DeadLetterPolicy( enabled: Bool, destination: String, max_payload_bytes: Int, )
Deterministic failure action.
pub type FailureAction {
Retry(next_attempt: Int, at_ms: Int)
DeadLetter(destination: String, reason: String)
Reject(reason: String)
}
Constructors
-
Retry(next_attempt: Int, at_ms: Int) -
DeadLetter(destination: String, reason: String) -
Reject(reason: String)
Optional reprocess-window contract.
pub type ReprocessWindow {
ReprocessWindow(
start_sequence: Int,
end_sequence: Int,
reason: String,
)
}
Constructors
-
ReprocessWindow( start_sequence: Int, end_sequence: Int, reason: String, )
Retry policy for connector failures.
pub type RetryPolicy {
RetryPolicy(
max_attempts: Int,
base_backoff_ms: Int,
max_backoff_ms: Int,
)
}
Constructors
-
RetryPolicy( max_attempts: Int, base_backoff_ms: Int, max_backoff_ms: Int, )
Sink-side connector abstraction.
pub type SinkConnector {
DatabaseSink(name: String, table: String, upsert_key: String)
PubSubSink(name: String, topic: String, ordering_key: String)
}
Constructors
-
DatabaseSink(name: String, table: String, upsert_key: String) -
PubSubSink(name: String, topic: String, ordering_key: String)
Source-side connector abstraction.
pub type SourceConnector {
DatabaseSource(name: String, query: String, batch_size: Int)
FileSource(
name: String,
path: String,
format: String,
batch_size: Int,
)
QueueSource(name: String, topic: String, prefetch: Int)
}
Constructors
-
DatabaseSource(name: String, query: String, batch_size: Int) -
FileSource( name: String, path: String, format: String, batch_size: Int, ) -
QueueSource(name: String, topic: String, prefetch: Int)
Values
pub fn classify_failure(
plan: ConnectorPlan,
attempt: Int,
reason: String,
now_ms: Int,
) -> FailureAction
Determine failure action from retry and dead-letter policy.
pub fn connector_plan(
name: String,
source: SourceConnector,
sink: SinkConnector,
retry_policy: RetryPolicy,
dead_letter_policy: DeadLetterPolicy,
reprocess_window: option.Option(ReprocessWindow),
max_batch_records: Int,
) -> ConnectorPlan
Build a connector plan.
pub fn database_sink(
name: String,
table: String,
upsert_key: String,
) -> SinkConnector
Build a database sink connector.
pub fn database_source(
name: String,
query: String,
batch_size: Int,
) -> SourceConnector
Build a database source connector.
pub fn dead_letter_policy(
plan: ConnectorPlan,
) -> DeadLetterPolicy
Access dead-letter policy.
pub fn dead_letter_policy_label(
policy: DeadLetterPolicy,
) -> String
Dead-letter policy label.
pub fn failure_action_label(action: FailureAction) -> String
Failure action label.
pub fn file_source(
name: String,
path: String,
format: String,
batch_size: Int,
) -> SourceConnector
Build a file source connector.
pub fn pubsub_sink(
name: String,
topic: String,
ordering_key: String,
) -> SinkConnector
Build a pubsub sink connector.
pub fn queue_source(
name: String,
topic: String,
prefetch: Int,
) -> SourceConnector
Build a queue source connector.
pub fn within_reprocess_window(
plan: ConnectorPlan,
sequence: Int,
) -> Bool
Check whether one sequence is allowed by the optional reprocess window.