Continuum.Runtime.Journal.Postgres (continuum v0.5.0)

Copy Markdown View Source

Durable journal adapter backed by Postgres via Ecto.

Implements the Continuum.Runtime.Journal behaviour. Every append operation is transactional and CAS-guarded by the lease state on the run row. Appends lock the run row, validate the lease token, assign a sequence number, and insert the event in one transaction.

The replay loop and engine code are identical whether this adapter or InMemory is in use — the only difference is durability and the fencing-token enforcement on writes.

Summary

Functions

Resolve a child's terminal state into the parent's history.

Complete the current run as {:continued, next_run_id} and insert the fresh continuation run, in one lease-CAS-guarded transaction.

Start a child workflow run and journal child_started to the parent.

Functions

await_child_terminal!(instance, parent_run_id, child_run_id, command_id, seq, lease_token)

Resolve a child's terminal state into the parent's history.

Locks the parent (CAS by lease). If the child run is terminal, appends the matching child_completed/child_failed/child_cancelled event to the parent and returns the decoded outcome; otherwise returns :pending.

cancel_run!(instance, run_id, lease_token)

clear_next_wakeup!(instance, run_id, lease_token)

complete_activity_task!(instance, task, result, lease_token, opts \\ [])

complete_compensation_task!(instance, task, result, lease_token, opts \\ [])

consume_signal(instance, run_id, name, lease_token)

continue_as_new!(instance, run_id, next_run_id, next_input, event, lease_token)

Complete the current run as {:continued, next_run_id} and insert the fresh continuation run, in one lease-CAS-guarded transaction.

The new run carries continued_from_run_id, the chain's correlation_id (the chain root's id), and any parent_run_id/parent_command_id so a continued child stays a child.

deliver_signal!(instance, run_id, name, payload)

fail_activity_task!(instance, task, error, lease_token)

fail_compensation_task!(instance, task, error, lease_token)

fire_timer!(instance, run_id, timer_id, lease_token)

get_activity_result(instance, activity_module, idempotency_key)

mark_unknown_version!(instance, run_id, error, lease_token)

resolve_signal_await(instance, run_id, await_event, lease_token)

retry_activity_task!(instance, task, error, retry_at, lease_token)

schedule_activity!(instance, run_id, event, task, lease_token)

schedule_compensation!(instance, run_id, event, task, lease_token)

Schedule a compensation activity task.

Reuses the activity-task append path: the compensation_scheduled event and the worker task are inserted under the run lease in one transaction. The task carries kind: :compensation and target_activity_id so the worker journals compensation_completed/compensation_failed on completion.

schedule_compensations!(instance, run_id, scheduled, lease_token)

schedule_signal_await!(instance, run_id, event, lease_token)

schedule_timer!(instance, run_id, event, timer, lease_token)

start_child!(instance, parent_run_id, child, lease_token)

Start a child workflow run and journal child_started to the parent.

In one transaction (CAS-guarded by the parent's lease): insert the child run row with parent_run_id/parent_command_id/correlation_id set and append the child_started event to the parent's history. The child run is left runnable for the dispatcher to claim.