factos/factos_pog

PostgreSQL backend for Factos using the pog package.

This backend stores accepted facts in an append-only factos_events table. The event history is the source of truth; projections and stream-shaped reads are derived views over that history.

The context dispatch flow follows the Command Context Consistency idea from “Simply Event Sourcing”: a command selects the facts required for its decision, folds them into temporary state, decides new facts, and appends those facts only when no relevant facts appeared after the observed context position.

The query contract is intentionally tag-based. PostgreSQL stores opaque event bytes, an event type, and tags. This keeps domain serialization outside the backend, but it means any payload value needed for a selective consistency query must be written as a tag.

Types

pub type Append {
  Append(
    current_revision: Int,
    position: factos.SequencePosition,
  )
}

Constructors

  • Append(current_revision: Int, position: factos.SequencePosition)

    Result of a successful append.

    current_revision is the latest revision of the target stream after the append. position is the global position of the last inserted event, or NoPosition when no events were produced.

pub type DecodeError {
  UnknownEvent
  InvalidData
}

Constructors

  • UnknownEvent
  • InvalidData
pub type Dispatch(event) {
  Dispatch(append: Append, events: List(factos.Recorded(event)))
}

Constructors

  • Dispatch(append: Append, events: List(factos.Recorded(event)))

    Result of a successful dispatch.

    append has the stream revision and final global position. events are the committed events recorded by this dispatch, suitable for pure Factos reactors or backend-specific durable effect adapters.

pub type Error(domain_error) {
  DomainError(domain_error)
  StoreError(pog.QueryError)
  AppendConditionFailed(factos.AppendCondition)
  DecodeError(DecodeError)
}

Constructors

  • DomainError(domain_error)

    The decider rejected the command with a domain error.

  • StoreError(pog.QueryError)

    PostgreSQL or pog returned an error while running a query.

  • AppendConditionFailed(factos.AppendCondition)

    A stream revision or context append condition failed.

  • DecodeError(DecodeError)
pub type EventCodec(event) {
  EventCodec(
    encode: fn(event) -> Proposed(event),
    decode: fn(StoredEvent) -> Result(
      factos.Decoded(event),
      DecodeError,
    ),
  )
}

Constructors

  • EventCodec(
      encode: fn(event) -> Proposed(event),
      decode: fn(StoredEvent) -> Result(
        factos.Decoded(event),
        DecodeError,
      ),
    )

    Application-owned PostgreSQL event codec.

    encode converts a domain event into bytes and metadata. decode converts a stored row back into a factos.Decoded domain event. Decode failures are kept in the application’s own error type and wrapped as DecodeError.

pub type Proposed(event) {
  Proposed(
    id: String,
    event: event,
    type_: factos.EventType,
    version: Int,
    tags: List(factos.Tag),
    metadata: factos.Metadata,
    data: BitArray,
  )
}

Constructors

  • Proposed(
      id: String,
      event: event,
      type_: factos.EventType,
      version: Int,
      tags: List(factos.Tag),
      metadata: factos.Metadata,
      data: BitArray,
    )

    A domain event prepared for PostgreSQL persistence.

    The application codec creates this value. id should identify the event for the application. type_ and tags are store-visible query metadata. data is opaque bytes owned by the application codec.

pub type StoredEvent {
  StoredEvent(
    position: Int,
    id: String,
    stream: String,
    revision: Int,
    type_: factos.EventType,
    version: Int,
    tags: List(factos.Tag),
    metadata: factos.Metadata,
    data: BitArray,
  )
}

Constructors

  • StoredEvent(
      position: Int,
      id: String,
      stream: String,
      revision: Int,
      type_: factos.EventType,
      version: Int,
      tags: List(factos.Tag),
      metadata: factos.Metadata,
      data: BitArray,
    )

    A raw event row read from PostgreSQL before domain decoding.

    Decoders receive this value so they can inspect stored metadata and bytes. position is the global append order. revision is the per-stream revision.

Values

pub fn codec(
  encode encode: fn(event) -> Proposed(event),
  decode decode: fn(StoredEvent) -> Result(
    factos.Decoded(event),
    DecodeError,
  ),
) -> EventCodec(event)

Create a new codec.

encode turns a domain event into a Proposed event ready for persistence. decode turns a stored row back into a domain event. Decode failures are returned as DecodeError and stop load/read flows rather than panicking.

pub fn dispatch(
  connection: pog.Connection,
  stream stream_name: String,
  decider decider: factos.Decider(
    command,
    state,
    event,
    domain_error,
  ),
  codec codec: EventCodec(event),
  command command: command,
) -> Result(Dispatch(event), Error(domain_error))

Run a stream-based read-decide-append command flow.

Use this when one stream is intentionally the consistency boundary. It remains useful, but it is not required by Event Sourcing. For command-specific rules, prefer dispatch_with_query so the protected boundary follows the decision.

pub fn dispatch_with_query(
  connection: pog.Connection,
  stream stream_name: String,
  query query: factos.Query,
  decider decider: factos.Decider(
    command,
    state,
    event,
    domain_error,
  ),
  codec codec: EventCodec(event),
  command command: command,
) -> Result(Dispatch(event), Error(domain_error))

Run a full context-first read-decide-append command flow.

PostgreSQL does not have a native primitive for “append if no row matching this arbitrary event-type/tag query appeared after position N”. This backend uses a transaction plus lock table factos_events in exclusive mode to make the read, context check, and append atomic for all Factos queries.

That lock is the main throughput tradeoff: unrelated writers queue behind each other even if their contexts do not overlap. It is deliberately simple and correct. A future backend can use advisory locks or query-specific lock keys, but only if it keeps the same context-stability guarantee.

pub fn error_to_string(
  error: Error(domain_error),
  domain_error_to_string: fn(domain_error) -> String,
) -> String
pub fn load_stream(
  connection: pog.Connection,
  stream stream_name: String,
  decider decider: factos.Decider(
    command,
    state,
    event,
    domain_error,
  ),
  codec codec: EventCodec(event),
) -> Result(
  factos.LoadedStream(event, state),
  Error(domain_error),
)

Load and fold one stream.

This supports classic stream-revision consistency. The returned factos.LoadedStream contains the folded state, decoded recorded events, and current stream revision.

pub fn migrate(
  connection: pog.Connection,
) -> Result(Nil, Error(domain_error))

The schema is an append-only factos_events table with a global identity position, per-stream revision, event type, newline-encoded tags, and opaque data bytes. Queryable tags are also mirrored into factos_event_tags, which gives event-type/tag context reads indexed SQL plans instead of loading the whole event table into the application.

pub fn read_context(
  connection: pog.Connection,
  query query: factos.Query,
  decider decider: factos.Decider(
    command,
    state,
    event,
    domain_error,
  ),
  codec codec: EventCodec(event),
) -> Result(factos.Context(event, state), Error(domain_error))

Read and fold the facts selected by a command-context query.

The backend reads stored rows, decodes them with the supplied codec, filters them with factos.matches_query, folds matching events with the decider’s evolve function, and returns a factos.Context with a FailIfEventsMatch(query, after) append condition.

Search Document