CSV and JSON export for audited row changes.
Uses the same filters and opts as Threadline.Query.timeline/2, including
:repo resolution: Keyword.get(opts, :repo) || Keyword.fetch!(filters, :repo).
Filter keys are validated via Threadline.Query.validate_timeline_filters!/1
(:repo, :table, :actor_ref, :from, :to, :correlation_id). Unknown keys
raise ArgumentError.
CSV columns
Fixed column order: id, transaction_id, table_schema, table_name, op,
captured_at, table_pk, data_after, changed_fields, changed_from,
transaction_json. The last column is a JSON object with transaction
id, occurred_at, actor_ref, and source. Datetimes are ISO 8601 UTC.
Pass include_action_metadata: true in opts to append trailing columns
correlation_id and action_id (the linked audit_actions row, when present).
Default CSV shape is unchanged when this option is absent or false.
JSON
Wrapped format (default) is one object with format_version, generated_at,
and changes. Each change may include an "action" object with "id" and
"correlation_id" when the transaction is linked to an audit_actions row.
Pass json_format: :ndjson for one JSON object per line (no outer wrapper).
Row limits
Default max_rows is 10_000. Exports use limit: max_rows + 1
to detect truncation; successful results include truncated, returned_count,
and max_rows. Empty matches return header-only CSV (one header row) and
changes: [] in JSON.
Streaming
stream_changes/2 pages by (captured_at, id) keyset and does not apply
max_rows — cap with Stream.take/2 or use to_csv_iodata/2 / to_json_document/2
for bounded exports.
Database errors from Ecto.Repo raise like timeline/2.
Summary
Functions
Counts changes matching filters without loading row payloads.
Returns the canonical CSV header row (one line ending in \r\n) for the
operator-surface export controller's chunked path.
Formats a pre-fetched list of %AuditChange{} structs into iodata for the
requested format.
Lazily enumerates AuditChange structs in timeline order using keyset pages.
Lazily enumerates the join-projected export-row maps (same shape as
to_csv_iodata/2 / to_json_document/2 consume) in timeline order using
keyset pages.
Returns CSV as iodata plus truncation metadata.
Returns JSON (wrapped object or NDJSON lines) as iodata plus truncation metadata.
Functions
@spec count_matching(keyword(), keyword()) :: {:ok, %{count: non_neg_integer()}}
Counts changes matching filters without loading row payloads.
Same validation and join semantics as Threadline.Query.timeline/2.
Options
:repo— optional if:repois present infilters:cap— when set to a positive integer, the count short-circuits at that value via a windowed subquery (SELECT count(*) FROM (... LIMIT ^cap)), so multi-million-row tables return immediately at the cap rather than waiting for a full aggregate scan. The default (nil) preserves the existing unbounded behavior. The Mix taskmix threadline.exportdoes NOT pass:capand is unaffected;Threadline.OperatorSurface.Live.TimelineLiveand the export controller passcap: 10_001so the LV can render "10,000+ matches" without hittingstatement_timeout.
Returns the canonical CSV header row (one line ending in \r\n) for the
operator-surface export controller's chunked path.
Same column order as to_csv_iodata/2. The chunked path emits this as the
first chunk before streaming data rows so the byte-equality parity test
holds against the iodata path.
Options
:include_action_metadata— whentrue, appendcorrelation_idandaction_idcolumns (same shape asto_csv_iodata/2).
Formats a pre-fetched list of %AuditChange{} structs into iodata for the
requested format.
Used by the operator-surface export controller to format streamed batches; CSV header and JSON envelopes are NOT emitted by this function and remain the caller's responsibility (the chunked path emits the header / envelope as its first / last chunk).
:csv— CSV data rows (each terminated by\r\nper RFC 4180); the caller MUST emitcsv_header/1as the first chunk.:json_wrapped— each row asJason.encode!/1output (a JSON object). The caller emits the surrounding{"format_version": ..., "generated_at": ..., "changes": [prefix and]}suffix as separate chunks plus the inter-row comma separators.:ndjson— each row asJason.encode!/1output followed by\n(no envelope; pure line-delimited JSON).
Options
:include_action_metadata(defaultfalse) — same shape asto_csv_iodata/2.
@spec stream_changes(keyword(), keyword()) :: Enumerable.t()
Lazily enumerates AuditChange structs in timeline order using keyset pages.
Does not enforce max_rows — combine with Stream.take/2 if needed.
Options
:repo— optional if present infilters:page_size— defaults to1000
@spec stream_export_rows(keyword(), keyword()) :: Enumerable.t()
Lazily enumerates the join-projected export-row maps (same shape as
to_csv_iodata/2 / to_json_document/2 consume) in timeline order using
keyset pages.
Each emitted item is a map with the keys :id, :transaction_id,
:table_schema, :table_name, :op, :captured_at, :table_pk,
:data_after, :changed_fields, :changed_from, :tx_occurred_at,
:tx_actor_ref, :tx_source, :aa_id, :aa_correlation_id — exactly the
projection from Threadline.Query.export_changes_query/1. This matches what
format_changes_iodata/3 expects, so the operator-surface export
controller's chunked path produces byte-identical output to the iodata path.
Does not enforce max_rows — combine with Stream.take/2 if needed.
Options
:repo— optional if present infilters:page_size— defaults to1000
Returns CSV as iodata plus truncation metadata.
See module documentation for filters, opts, and column layout.
Options
:repo— optional if:repois present infilters:max_rows— defaults to10000:include_action_metadata— whentrue, appendcorrelation_idandaction_idcolumns
Returns JSON (wrapped object or NDJSON lines) as iodata plus truncation metadata.
Options
:repo,:max_rows— same asto_csv_iodata/2:json_format—:wrapped(default) or:ndjson