Threadline.Query (Threadline v0.5.0)

Copy Markdown View Source

Ecto query implementations for the Threadline public API.

All functions require an explicit :repo option and return plain lists of Ecto structs. DB errors propagate as exceptions, consistent with Ecto.Repo.all/2.

Timeline filters

timeline/2, timeline_query/1, and Threadline.Export accept the same filter keyword list. Only these keys are allowed: :repo, :table, :actor_ref, :from, :to, :correlation_id. Unknown keys raise ArgumentError (breaking vs pre-1.0 callers that relied on silent ignores — see CHANGELOG when upgrading).

When :correlation_id is set to a non-empty string (after trimming), results are limited to changes whose transaction is linked to an audit_actions row with that correlation_id (strict inner-join semantics; see CHANGELOG). Omit the key to leave correlation out of the filter.

Use timeline_repo!/2 to resolve :repo from filters and opts with the same messages as export entrypoints.

See also

Summary

Functions

Returns a keyset page of AuditTransaction records for a given actor, ordered by occurred_at descending, then id descending.

Returns the latest stored row snapshot for a schema record at or before timestamp.

Returns every %AuditChange{} for a single capture transaction.

Returns one AuditTransaction by id or nil when the row does not exist.

Query returning one row per matching change with change + transaction columns for export (Threadline.Export).

Returns AuditChange records for a given schema record, ordered by captured_at descending.

Returns AuditChange records for one schema row using timeline ordering.

Returns one keyset page of row history for a single schema row.

Returns AuditChange records across tables, filtered by the given options, ordered by captured_at descending, then id descending.

Returns one keyset page of AuditChange records in timeline order.

Builds the shared AuditChange query used by timeline/2 and export.

Resolves Ecto.Repo for timeline/2, export, and related APIs.

Validates row-history helper filters.

Validates that filters contains only timeline filter keys.

Functions

actor_history(actor_ref, opts)

Returns a keyset page of AuditTransaction records for a given actor, ordered by occurred_at descending, then id descending.

For anonymous actors, returns all anonymous transactions (no actor_id distinction — all anonymous transactions are equivalent by design, per ACTR-03).

Options

  • :repo — required Ecto.Repo module
  • :limit — integer, maximum number of records to return (default 50)
  • :after — cursor to fetch older records
  • :before — cursor to fetch newer records
  • :from — inclusive lower bound on occurred_at
  • :to — inclusive upper bound on occurred_at

Example

Threadline.actor_history(actor_ref, repo: MyApp.Repo)

as_of(schema_module, id, timestamp, opts)

Returns the latest stored row snapshot for a schema record at or before timestamp.

Returns {:ok, map} when the latest matching snapshot is an insert/update row, {:error, :deleted_record} when the latest snapshot is a delete, and {:error, :before_audit_horizon} when the row has no snapshot at or before the requested timestamp.

audit_changes_for_transaction(transaction_id, opts)

@spec audit_changes_for_transaction(
  term(),
  keyword()
) :: [Threadline.Capture.AuditChange.t()]

Returns every %AuditChange{} for a single capture transaction.

transaction_id is audit_transactions.id — the same column as AuditChange.transaction_id (:binary_id). Accepts UUID strings or 16-byte binaries per Ecto.UUID.cast/1.

Ordering

Same total order as timeline/2: captured_at descending, then id descending (via timeline_order/1). Random binary_id values are not a monotonic sequence; ordering is only defined on (captured_at, id).

Empty results

Returns [] when the UUID is well-formed but no audit_changes rows match (whether the parent transaction row exists or not).

Options

  • :repo — required Ecto.Repo module (Keyword.fetch!/2 if missing).
  • :preload — optional association list for repo.preload/3 when non-empty (intended: [:transaction]).

Errors

Raises ArgumentError with message containing invalid audit transaction id when transaction_id fails UUID cast (before hitting Postgrex).

audit_transaction(transaction_id, opts)

@spec audit_transaction(
  term(),
  keyword()
) :: Threadline.Capture.AuditTransaction.t() | nil

Returns one AuditTransaction by id or nil when the row does not exist.

Raises ArgumentError when transaction_id is not a valid UUID.

export_changes_query(filters)

@spec export_changes_query(keyword()) :: Ecto.Query.t()

Query returning one row per matching change with change + transaction columns for export (Threadline.Export).

Validates filters, then builds the same predicate stack as timeline/2, adds an optional LEFT JOIN to audit_actions when :correlation_id is absent (so JSON can surface linked action metadata without changing filter semantics), and selects export column maps.

export_changes_query(filters, opts)

@spec export_changes_query(keyword(), keyword()) :: Ecto.Query.t()

history(schema_module, id, opts)

Returns AuditChange records for a given schema record, ordered by captured_at descending.

Options

Example

Threadline.history(MyApp.User, user.id, repo: MyApp.Repo)

Each AuditChange loads all table columns mapped on the schema, including changed_from when the database column is populated (no narrowing select).

row_history(schema_module, id, filters \\ [], opts \\ [])

@spec row_history(module(), term(), keyword(), keyword()) :: [
  Threadline.Capture.AuditChange.t()
]

Returns AuditChange records for one schema row using timeline ordering.

The helper fixes table_name and primary-key containment internally so callers do not need to construct low-level row predicates.

row_history_page(schema_module, id, filters \\ [], opts \\ [])

@spec row_history_page(module(), term(), keyword(), keyword()) ::
  Threadline.Query.TimelinePage.t()

Returns one keyset page of row history for a single schema row.

timeline(filters \\ [], opts \\ [])

Returns AuditChange records across tables, filtered by the given options, ordered by captured_at descending, then id descending.

Options

  • :table — string or atom; filters by table_name
  • :actor_ref%ActorRef{}; filters by actor via joined audit_transactions
  • :fromDateTime; inclusive lower bound on captured_at
  • :toDateTime; inclusive upper bound on captured_at
  • :correlation_id — binary; strict filter on linked AuditAction.correlation_id (see moduledoc / CHANGELOG)
  • :repo — required Ecto.Repo module (in filters or opts; see Threadline.Export)

Example

Threadline.timeline(table: "users", from: ~U[2026-01-01 00:00:00Z], repo: MyApp.Repo)

See also

timeline_page(filters \\ [], opts \\ [])

@spec timeline_page(keyword(), keyword()) :: Threadline.Query.TimelinePage.t()

Returns one keyset page of AuditChange records in timeline order.

Paging controls live in opts:

  • :page_size — positive integer, defaults to 1000
  • :cursor%{captured_at: %DateTime{}, id: binary} or nil

timeline_query(filters)

@spec timeline_query(keyword()) :: Ecto.Query.t()

Builds the shared AuditChange query used by timeline/2 and export.

Does not call validate_timeline_filters!/1 — callers must validate first when accepting external filter lists.

timeline_repo!(filters \\ [], opts \\ [])

@spec timeline_repo!(keyword(), keyword()) :: module()

Resolves Ecto.Repo for timeline/2, export, and related APIs.

Checks opts first, then filters. Raises ArgumentError if missing or not an atom module.

validate_row_history_filters!(filters)

@spec validate_row_history_filters!(keyword()) :: :ok

Validates row-history helper filters.

Allowed keys: :repo, :from, :to.

validate_timeline_filters!(filters)

@spec validate_timeline_filters!(keyword()) :: :ok

Validates that filters contains only timeline filter keys.

Allowed keys: :repo, :table, :actor_ref, :from, :to, :correlation_id.

Returns :ok or raises ArgumentError.