DustEcto.Repo (DustEcto v0.1.2)

Copy Markdown View Source

Ecto-shaped facade over the active DustEcto.Transport. Functions mirror the parts of Ecto.Repo that map cleanly onto Dust's flat KV model: all/1, get/2 + get!/2, stream/1, exists?/2, plus insert/1, update/1, delete/1 + delete/2, and delete_all/1.

No where, no from, no preload, no insert_all, no transaction (yet — pending the upstream entries.batch_write primitive that's now shipped, plus a corresponding SDK transport implementation).

Honest contract

Dust writes are upserts. There's no atomic insert-or-fail or read-modify-write at the wire level (capver 2 has leaf-only CAS). insert/1 and update/1 are both validated upserts: they run the changeset, then write. If you need INSERT-or-fail semantics, do a Repo.exists?/2 check first and accept that another writer can race you.

Summary

Functions

Loads every record of schema. Walks every page of the underlying enum until next_cursor is nil — no silent truncation.

Atomic multi-record write. Accepts a list of operation tuples

Removes a record. Accepts a struct or changeset; delete/2 also accepts a (schema, slug) shape. Always issues a single DELETE against <prefix>.<slug> — server clears the leaf and every descendant.

Three-arg convenience form accepting (schema, slug, opts). Equivalent to delete(%schema{slug: slug}, opts).

Removes every record of schema. Returns {:ok, %{store_seq: n}} — note the server's DELETE doesn't report a row count.

Cheap existence probe. SDK mode: in-process cache lookup. HTTP mode: one HEAD round-trip (no body).

Fetches a single record by slug. Returns {:ok, struct} on a hit, {:error, :not_found} on a miss, {:error, %DustEcto.Error{}} on a transport failure.

Like get/2 but raises if the record is missing or transport errored.

Validated upsert. Runs the changeset; on success, dumps the struct and writes it to the store.

Like insert/1 but raises on changeset/transport errors.

Returns a Stream of records of schema, lazy across pages. Useful when the prefix could match more than a few hundred records.

Subscribes the given callback to record-level changes for schema. Callback receives {:upserted, struct} or {:deleted, slug} events in the dust SDK's :committed mode — exactly one delivery per write, including for the writer's own changes, with store_seq durably attached.

Subscribes to the raw underlying op events for schema — no reassembly into structs. Callback receives the SDK's event map %{op:, path:, value:, store_seq:, ...} exactly.

Removes a subscription previously registered via subscribe/2 or subscribe_raw/2.

Validated upsert. Runs the changeset; on success, dumps the struct and writes only the changed fields (in flat mode) or the full record (in map mode). Returns the same shapes as insert/1.

Functions

all(schema)

@spec all(module()) :: {:ok, [struct()]} | {:error, DustEcto.Error.t()}

Loads every record of schema. Walks every page of the underlying enum until next_cursor is nil — no silent truncation.

Records that fail the required-fields guard (schema.__dust_required_fields__/0) are silently dropped and a Logger.warning records the slug, missing fields, and any unrecognized fields so devs can grep the logs.

batch_write(ops)

@spec batch_write([tuple()]) ::
  {:ok, %{store_seq: integer(), ops: list()}}
  | {:error, Ecto.Changeset.t() | DustEcto.Error.t()}

Atomic multi-record write. Accepts a list of operation tuples:

Repo.batch_write([
  {:insert, Link.changeset(%Link{}, attrs1)},
  {:insert, Link.changeset(%Link{}, attrs2)},
  {:update, existing_cs, if_match: 7},
  {:delete, Link, "stale-slug"},
  {:delete, Link, "old", if_match: 4}
])

Returns {:ok, %{store_seq:, ops: [...]}} on a server-side commit, {:error, %Ecto.Changeset{}} if any changeset fails validation (short-circuits before the batch is sent), or {:error, %DustEcto.Error{}} on transport failure.

Mode interaction

  • :map-mode schemas produce one wire op per record (PUT at <prefix>.<slug>). :if_match applies to that single op.
  • :flat-mode schemas produce N wire ops per record (one PUT per non-nil field). :if_match in :flat mode raises ArgumentError — per-field CAS requires per-field revisions, which this v1 API doesn't surface. Open an issue if you need it.

All ops in a single batch_write/1 commit atomically server-side — either every op lands or none of them does.

delete(struct_or_cs)

@spec delete(struct() | Ecto.Changeset.t()) ::
  {:ok, %{store_seq: integer()}} | {:error, DustEcto.Error.t()}

Removes a record. Accepts a struct or changeset; delete/2 also accepts a (schema, slug) shape. Always issues a single DELETE against <prefix>.<slug> — server clears the leaf and every descendant.

Options

  • :if_match — optimistic-concurrency revision. Server enforces leaf-only CAS in capver 2; meaningful for :map-mode records where the slug path itself is a leaf. On a subtree delete the server may ignore or reject — surface as :conflict.

delete(schema, opts)

@spec delete(struct() | Ecto.Changeset.t() | module(), keyword() | String.t()) ::
  {:ok, %{store_seq: integer()}} | {:error, DustEcto.Error.t()}

delete(schema, slug, opts)

@spec delete(module(), String.t(), keyword()) ::
  {:ok, %{store_seq: integer()}} | {:error, DustEcto.Error.t()}

Three-arg convenience form accepting (schema, slug, opts). Equivalent to delete(%schema{slug: slug}, opts).

delete_all(schema)

@spec delete_all(module()) ::
  {:ok, %{store_seq: integer()}} | {:error, DustEcto.Error.t()}

Removes every record of schema. Returns {:ok, %{store_seq: n}} — note the server's DELETE doesn't report a row count.

exists?(schema, slug)

@spec exists?(module(), String.t()) :: {:ok, boolean()} | {:error, DustEcto.Error.t()}

Cheap existence probe. SDK mode: in-process cache lookup. HTTP mode: one HEAD round-trip (no body).

get(schema, slug)

@spec get(module(), String.t()) ::
  {:ok, struct()} | {:error, :not_found | DustEcto.Error.t()}

Fetches a single record by slug. Returns {:ok, struct} on a hit, {:error, :not_found} on a miss, {:error, %DustEcto.Error{}} on a transport failure.

get!(schema, slug)

@spec get!(module(), String.t()) :: struct()

Like get/2 but raises if the record is missing or transport errored.

insert(cs)

@spec insert(Ecto.Changeset.t() | struct()) ::
  {:ok, struct()} | {:error, Ecto.Changeset.t() | DustEcto.Error.t()}

Validated upsert. Runs the changeset; on success, dumps the struct and writes it to the store.

Returns {:ok, struct}, {:error, %Ecto.Changeset{}} on validation failure, or {:error, %DustEcto.Error{}} on transport failure.

insert!(input)

@spec insert!(Ecto.Changeset.t() | struct()) :: struct()

Like insert/1 but raises on changeset/transport errors.

stream(schema)

@spec stream(module()) :: Enumerable.t()

Returns a Stream of records of schema, lazy across pages. Useful when the prefix could match more than a few hundred records.

The stream still applies the required-fields guard with the same warning behaviour as all/1.

subscribe(schema, callback)

@spec subscribe(module(), (term() -> any())) ::
  {:ok, reference()} | {:error, DustEcto.Error.t()}

Subscribes the given callback to record-level changes for schema. Callback receives {:upserted, struct} or {:deleted, slug} events in the dust SDK's :committed mode — exactly one delivery per write, including for the writer's own changes, with store_seq durably attached.

HTTP mode: returns {:error, %DustEcto.Error{kind: :not_supported}}.

The returned ref can be passed to unsubscribe/1.

subscribe_raw(schema, callback)

@spec subscribe_raw(module(), (map() -> any())) ::
  {:ok, reference()} | {:error, DustEcto.Error.t()}

Subscribes to the raw underlying op events for schema — no reassembly into structs. Callback receives the SDK's event map %{op:, path:, value:, store_seq:, ...} exactly.

Useful for users who need per-leaf provenance or want to run their own assembly. HTTP mode: same :not_supported as subscribe/2.

unsubscribe(ref)

@spec unsubscribe(reference()) :: :ok

Removes a subscription previously registered via subscribe/2 or subscribe_raw/2.

update(cs, opts \\ [])

@spec update(
  Ecto.Changeset.t(),
  keyword()
) :: {:ok, struct()} | {:error, Ecto.Changeset.t() | DustEcto.Error.t()}

Validated upsert. Runs the changeset; on success, dumps the struct and writes only the changed fields (in flat mode) or the full record (in map mode). Returns the same shapes as insert/1.

Options

  • :if_match — optimistic-concurrency revision. Only supported on :map-mode schemas (single leaf write at <prefix>.<slug>). In :flat mode the update is N PUTs, none of which have a meaningful whole-record revision — pass :if_match and the call raises ArgumentError. For atomic multi-field CAS use batch_write/2.