Threadline.Export (Threadline v0.5.0)

Copy Markdown View Source

CSV and JSON export for audited row changes.

Uses the same filters and opts as Threadline.Query.timeline/2, including :repo resolution: Keyword.get(opts, :repo) || Keyword.fetch!(filters, :repo).

Filter keys are validated via Threadline.Query.validate_timeline_filters!/1 (:repo, :table, :actor_ref, :from, :to, :correlation_id). Unknown keys raise ArgumentError.

CSV columns

Fixed column order: id, transaction_id, table_schema, table_name, op, captured_at, table_pk, data_after, changed_fields, changed_from, transaction_json. The last column is a JSON object with transaction id, occurred_at, actor_ref, and source. Datetimes are ISO 8601 UTC.

Pass include_action_metadata: true in opts to append trailing columns correlation_id and action_id (the linked audit_actions row, when present). Default CSV shape is unchanged when this option is absent or false.

JSON

Wrapped format (default) is one object with format_version, generated_at, and changes. Each change may include an "action" object with "id" and "correlation_id" when the transaction is linked to an audit_actions row. Pass json_format: :ndjson for one JSON object per line (no outer wrapper).

Row limits

Default max_rows is 10_000. Exports use limit: max_rows + 1 to detect truncation; successful results include truncated, returned_count, and max_rows. Empty matches return header-only CSV (one header row) and changes: [] in JSON.

Streaming

stream_changes/2 pages by (captured_at, id) keyset and does not apply max_rows — cap with Stream.take/2 or use to_csv_iodata/2 / to_json_document/2 for bounded exports.

Database errors from Ecto.Repo raise like timeline/2.

Summary

Functions

Counts changes matching filters without loading row payloads.

Returns the canonical CSV header row (one line ending in \r\n) for the operator-surface export controller's chunked path.

Formats a pre-fetched list of %AuditChange{} structs into iodata for the requested format.

Lazily enumerates AuditChange structs in timeline order using keyset pages.

Lazily enumerates the join-projected export-row maps (same shape as to_csv_iodata/2 / to_json_document/2 consume) in timeline order using keyset pages.

Returns CSV as iodata plus truncation metadata.

Returns JSON (wrapped object or NDJSON lines) as iodata plus truncation metadata.

Functions

count_matching(filters, opts \\ [])

@spec count_matching(keyword(), keyword()) :: {:ok, %{count: non_neg_integer()}}

Counts changes matching filters without loading row payloads.

Same validation and join semantics as Threadline.Query.timeline/2.

Options

  • :repo — optional if :repo is present in filters
  • :cap — when set to a positive integer, the count short-circuits at that value via a windowed subquery (SELECT count(*) FROM (... LIMIT ^cap)), so multi-million-row tables return immediately at the cap rather than waiting for a full aggregate scan. The default (nil) preserves the existing unbounded behavior. The Mix task mix threadline.export does NOT pass :cap and is unaffected; Threadline.OperatorSurface.Live.TimelineLive and the export controller pass cap: 10_001 so the LV can render "10,000+ matches" without hitting statement_timeout.

csv_header(opts \\ [])

@spec csv_header(keyword()) :: iodata()

Returns the canonical CSV header row (one line ending in \r\n) for the operator-surface export controller's chunked path.

Same column order as to_csv_iodata/2. The chunked path emits this as the first chunk before streaming data rows so the byte-equality parity test holds against the iodata path.

Options

  • :include_action_metadata — when true, append correlation_id and action_id columns (same shape as to_csv_iodata/2).

format_changes_iodata(rows, format, opts \\ [])

@spec format_changes_iodata([struct()], :csv | :json_wrapped | :ndjson, keyword()) ::
  iodata()

Formats a pre-fetched list of %AuditChange{} structs into iodata for the requested format.

Used by the operator-surface export controller to format streamed batches; CSV header and JSON envelopes are NOT emitted by this function and remain the caller's responsibility (the chunked path emits the header / envelope as its first / last chunk).

  • :csv — CSV data rows (each terminated by \r\n per RFC 4180); the caller MUST emit csv_header/1 as the first chunk.
  • :json_wrapped — each row as Jason.encode!/1 output (a JSON object). The caller emits the surrounding {"format_version": ..., "generated_at": ..., "changes": [ prefix and ]} suffix as separate chunks plus the inter-row comma separators.
  • :ndjson — each row as Jason.encode!/1 output followed by \n (no envelope; pure line-delimited JSON).

Options

  • :include_action_metadata (default false) — same shape as to_csv_iodata/2.

stream_changes(filters, opts \\ [])

@spec stream_changes(keyword(), keyword()) :: Enumerable.t()

Lazily enumerates AuditChange structs in timeline order using keyset pages.

Does not enforce max_rows — combine with Stream.take/2 if needed.

Options

  • :repo — optional if present in filters
  • :page_size — defaults to 1000

stream_export_rows(filters, opts \\ [])

@spec stream_export_rows(keyword(), keyword()) :: Enumerable.t()

Lazily enumerates the join-projected export-row maps (same shape as to_csv_iodata/2 / to_json_document/2 consume) in timeline order using keyset pages.

Each emitted item is a map with the keys :id, :transaction_id, :table_schema, :table_name, :op, :captured_at, :table_pk, :data_after, :changed_fields, :changed_from, :tx_occurred_at, :tx_actor_ref, :tx_source, :aa_id, :aa_correlation_id — exactly the projection from Threadline.Query.export_changes_query/1. This matches what format_changes_iodata/3 expects, so the operator-surface export controller's chunked path produces byte-identical output to the iodata path.

Does not enforce max_rows — combine with Stream.take/2 if needed.

Options

  • :repo — optional if present in filters
  • :page_size — defaults to 1000

to_csv_iodata(filters, opts \\ [])

@spec to_csv_iodata(keyword(), keyword()) :: {:ok, map()}

Returns CSV as iodata plus truncation metadata.

See module documentation for filters, opts, and column layout.

Options

  • :repo — optional if :repo is present in filters
  • :max_rows — defaults to 10000
  • :include_action_metadata — when true, append correlation_id and action_id columns

to_json_document(filters, opts \\ [])

@spec to_json_document(keyword(), keyword()) :: {:ok, map()}

Returns JSON (wrapped object or NDJSON lines) as iodata plus truncation metadata.

Options

  • :repo, :max_rows — same as to_csv_iodata/2
  • :json_format:wrapped (default) or :ndjson