Bulkinup (Bulkinup v0.6.0)

View Source

Bulk inserts and upserts for nested Ecto schemas.

Ecto's Ecto.Repo.insert_all/3 writes one flat table at a time and skips changeset validation. Bulkinup fills that gap: it validates attrs maps through your schemas' changeset functions, then writes a parent and its nested associations across multiple tables in one call, wrapped in a single transaction by default.

Two sibling verbs share the same engine, options, and semantics:

  • upsert/4 — bulk upsert: existing rows are updated on conflict.
  • insert/4 — pure bulk insert: writing a row that already exists raises.

use Bulkinup (see __using__/1) injects repo-scoped bulk_insert/3 and bulk_upsert/3 into your repo module, with app-wide defaults declared once at use time.

For a walkthrough, see the guides: Getting Started, Nested Associations, Recipes, and Migrating from bulk_upsert.

Summary

Types

Options accepted by insert/4 and upsert/4. See upsert/4's documentation for details. :replace_all_except is upsert-only: insert/4 raises an ArgumentError when given it.

Functions

Injects bulk_insert/3 and bulk_upsert/3 into the calling repo module — insert/4 and upsert/4 scoped to that repo, with shared defaults declared once at use time

Like upsert/4, but a pure bulk insert: rows are only ever created, never updated.

Validate attrs maps (attrs_list) by passing them through an Ecto changeset, then upsert the valid items to the database that corresponds to a given Ecto repo_module (e.g. YourProject.Repo).

Types

options()

@type options() :: [
  changeset_function_atom: atom(),
  chunk_size: pos_integer(),
  insert_all_function_module: module(),
  insert_all_function_atom: atom(),
  insert_all_opts: %{optional(module() | Ecto.Schema.source()) => Keyword.t()},
  max_concurrency: pos_integer(),
  placeholders: %{
    optional(module() | Ecto.Schema.source()) => %{optional(atom()) => term()}
  },
  recover_changeset_errors: %{
    optional(module()) => %{optional(atom()) => term()}
  },
  replace_all_except: [atom()],
  timeout: timeout()
]

Options accepted by insert/4 and upsert/4. See upsert/4's documentation for details. :replace_all_except is upsert-only: insert/4 raises an ArgumentError when given it.

Map keys typed module() | Ecto.Schema.source() accept a schema module or, for many_to_many join tables, the source as a string (e.g. "persons_topics").

Functions

__using__(use_opts)

(macro)

Injects bulk_insert/3 and bulk_upsert/3 into the calling repo module — insert/4 and upsert/4 scoped to that repo, with shared defaults declared once at use time:

defmodule YourProject.Repo do
  use Ecto.Repo, otp_app: :your_project, adapter: Ecto.Adapters.Postgres

  use Bulkinup,
    timeout: fetch_timeout!(),
    insert: [chunk_size: 500],
    upsert: [replace_all_except: [:inserted_at]]
end

YourProject.Repo.bulk_upsert(Person, attrs_list)
{:ok, %{upserted: 2, skipped: 0}}

Defaults and precedence

  • A flat key (e.g. timeout: above) is a shared default, applied to each verb it is valid for. The upsert-only :replace_all_except is applied to bulk_upsert/3 only — though passing it per-call to bulk_insert/3 still raises, as it does for insert/4.
  • The insert: and upsert: keys hold per-verb defaults.
  • Precedence: per-call opts override the verb namespace, which overrides flat keys.

Option names are validated at compile time: a typo, or a key that is valid for no verb it would apply to, is a compile error. Option values are injected unevaluated and evaluated per call at runtime, so dynamic defaults (like fetch_timeout!() above, resolved in the repo module) work.

insert(repo_module, schema_module, attrs_list, opts \\ [])

@spec insert(module(), module(), Enumerable.t(map()), options()) ::
  {:ok, %{inserted: non_neg_integer(), skipped: non_neg_integer()}}

Like upsert/4, but a pure bulk insert: rows are only ever created, never updated.

No on_conflict or conflict_target defaults are applied at any level — the parent schema, nested associations, and many_to_many join tables all use Ecto's default conflict behavior, so inserting a row (or join table link) that already exists raises (a Postgrex.Error unique violation on Postgres). By default the entire insert runs in a single transaction, so every change is rolled back when a duplicate raises.

To tolerate children or join rows shared with data that is already in the database (e.g. many_to_many records that several parents reference), override the conflict behavior for just those sources via :insert_all_opts:

iex> Bulkinup.insert(
...>   YourProject.Repo,
...>   YourProject.Blog.Post,
...>   attrs_list,
...>   insert_all_opts: %{
...>     YourProject.Blog.Tag => [on_conflict: :nothing],
...>     "posts_tags" => [on_conflict: :nothing]
...>   }
...> )
{:ok, %{inserted: 2, skipped: 0}}

Returns {:ok, %{inserted: inserted_count, skipped: skipped_count}}, with the same meaning as upsert/4's counts.

Everything else — changeset validation, :recover_changeset_errors, :placeholders, chunking, streaming input, :max_concurrency, :timeout, and the skipped-items summary logging — behaves exactly as documented for upsert/4. The upsert-only option :replace_all_except raises an ArgumentError.

upsert(repo_module, schema_module, attrs_list, opts \\ [])

@spec upsert(module(), module(), Enumerable.t(map()), options()) ::
  {:ok, %{upserted: non_neg_integer(), skipped: non_neg_integer()}}

Validate attrs maps (attrs_list) by passing them through an Ecto changeset, then upsert the valid items to the database that corresponds to a given Ecto repo_module (e.g. YourProject.Repo).

attrs_list may be any Enumerable — a plain list, or a lazy Stream for large inputs (see the Streaming section below).

Using a changeset serves two purposes:

  1. The changeset can be used to validate and transform the data.
  2. Using a changeset allows this function to perform bulk upserts with nested associations.

For validation, each item in the attrs_list is converted to a changeset for a given schema_module. The changeset function is called with a single argument (the attrs map), so the schema module must expose a 1-arity changeset function — e.g. a changeset/2 whose first argument defaults to an empty struct. By default, this function is called :changeset. (See the Options section below for more info.)

Basic example

iex> Bulkinup.upsert(
...>   YourProject.Repo,
...>   YourProject.Persons.Person,
...>   _attrs_list = [
...>     %{id: 1, name: "Alice", age: 25, phone_number: "555-1234"},
...>     %{id: 2, name: "Bob", age: 35, phone_number: "555-2345"},
...>   ]
...> )
{:ok, %{upserted: 2, skipped: 0}}

Return value

Returns {:ok, %{upserted: upserted_count, skipped: skipped_count}}, where the counts refer to the top-level attrs: :upserted is the number of items sent to the database, and :skipped is the number of items dropped because their changesets were invalid. (Skipped items are summarized in one :warning log per call, with per-item detail at the :debug level.) A database error raises; by default the entire upsert runs in a single transaction, so every change is rolled back (with :max_concurrency, only the failing chunk is — see below).

Streaming

attrs_list is consumed lazily, in chunks of :chunk_size items: a Stream is never fully materialized, so memory stays bounded for arbitrarily large inputs. Plain lists behave identically (same counts, same single skipped-items summary log). Note that without :max_concurrency, the single transaction — and any locks it takes — stays open for the stream's full duration.

Options

Unknown option names raise an ArgumentError, as does passing a Bulkinup option (other than :timeout or :placeholders, which insert_all/3 also accepts) inside an :insert_all_opts value.

Warning

The :changeset_function_atom, :insert_all_function_module, and :insert_all_function_atom options are invoked via apply/3. Never build these option values from untrusted (e.g. user-supplied) input.

  • :changeset_function_atom - The name of the changeset function to apply for the given schema_module. It is called with one argument: the attrs map. (Default: :changeset)

  • :chunk_size - The number of parent attrs items to insert into the database in a single query. Can be increased or decreased as needed to avoid exceeding the Postgres parameter limit for a single query. (Default: 1000)

  • :insert_all_function_module - Instead of using the :insert_all function in the given repo_module, you may specify the name of a custom module to use instead. (Default: Inherited from the value specified in the repo_module function argument, e.g. YourProject.Repo)

    • Example: YourProject.OtherRepo
  • :insert_all_function_atom - Instead of using your repo module's :insert_all function, you may pass a compatible equivalent that accepts the same arguments. (Default: :insert_all)

    • Example: :insert_all_with_autogenerated_timestamps
  • :insert_all_opts - Pass custom opts to the insert_all/3 function. This option consists of a map whose key is the schema or source that may have items being upserted, and the value is the YourProject.Repo.insert_all/3 opts that will be applied when items for that schema are being upserted. By default, a conflicting row has all of its values replaced except the primary key(s) (see the :replace_all_except option). (Default: %{})

    • Example: %{YourProject.Persons.Person => [on_conflict: {:nothing}]}
    • A many_to_many join table is keyed by its source, e.g. %{"persons_topics" => [...]}.
    • :conflict_target defaults to the schema's primary key, so a schema without a primary key must supply its own :conflict_target here (otherwise the upsert fails at the database).
  • :max_concurrency - Upsert up to this many chunks of :chunk_size parents concurrently (via Task.async_stream/3), each chunk in its own transaction. By default, all chunks are upserted sequentially inside a single transaction. Setting this option trades the single-transaction guarantee for insert throughput: (Default: nil)

    • A failing chunk still raises, but chunks that already committed stay committed, so a failure partway through leaves the database with partial results.
    • :timeout applies to each chunk's transaction instead of the whole call.
    • Concurrent chunks that share many_to_many child records may upsert the same related (or join table) rows in different orders, which can deadlock in Postgres. Ensure concurrent input does not share child records across chunks, or be prepared to retry on deadlock.
    • In the test environment, the Ecto SQL sandbox requires shared mode (or explicit allowances) so the spawned tasks may use the test's database connection — e.g. use MyApp.DataCase, async: false with a Phoenix-style setup_sandbox/1.
  • :placeholders - Set fields from shared values that are sent to the database once instead of once per row, using the :placeholders feature of Ecto's insert_all/3. This option is a map whose key is the schema or source being upserted, and the value is a map of field => value. The fields do not need to appear in the attrs. (Default: %{})

    • Example: %{YourProject.Persons.Person => %{inserted_at: DateTime.utc_now()}}
    • Each placeholder value is injected into the attrs before the changeset is built, so a placeholder field is cast and validated like any other field and may be included in the changeset's validate_required/2.
    • The shared value replaces any per-row value supplied for the field in the attrs.
    • Embedded schemas are stored inline on their parent row and are never upserted as their own source, so placeholder values keyed by an embedded schema module are ignored.
  • :recover_changeset_errors - If the given fields in a changeset have errors, then replace them with a custom fallback value. (Default: %{})

    • Example: %{YourProject.Persons.Person => %{phone_number: "INVALID"}}
    • Applies recursively to nested association and embedded changesets, with fallbacks looked up by each changeset's schema (for embeds, the embedded schema module). A parent's association error is cleared once all of that association's child changesets have been recovered.
    • A changeset is only recovered if every one of its error fields has a fallback and every nested changeset is recoverable by the same rule; otherwise the row is skipped.
    • A fallback value is applied without re-running the changeset function, so it must be valid for the schema.
    • Errors on the association and embed fields themselves (e.g. an association whose attrs could not be cast at all) are never recoverable.
  • :replace_all_except - If a row already exists, then all fields will be replaced except the primary key, and any fields specified here. (Default: [])

    • Example: [:field, :other_field]
  • :timeout - The maximum timeout for the transaction that wraps the entire bulk upsert (all chunks), also applied to each insert_all/3 query. With :max_concurrency, the timeout applies to each chunk's transaction instead. (Default: 15000)

    • Example: 60_000

Examples

Upsert a list of Person attrs using the changeset function YourProject.Persons.Person.upsert_changeset/2 to validate the attrs:

iex> attrs_list = [%{id: 1, name: "Alice", ...}]

iex> Bulkinup.upsert(
...>   YourProject.Repo,
...>   YourProject.Persons.Person,
...>   attrs_list,
...>   changeset_function_atom: :upsert_changeset
...> )
{:ok, %{upserted: 1, skipped: 0}}

Upsert a list of attrs, overwriting only the :name field if there is a conflict. Schemas that are not given custom :insert_all_opts keep the default conflict behavior (replace all fields except the primary key):

iex> insert_all_opts = %{
...>   YourProject.Persons.Person => [on_conflict: {:replace, [:name]}]
...> }

iex> Bulkinup.upsert(
...>   YourProject.Repo,
...>   YourProject.Persons.Person,
...>   _attrs_list = [%{id: 1, name: "Alicia"}],
...>   insert_all_opts: insert_all_opts
...> )
{:ok, %{upserted: 1, skipped: 0}}

Associations

Nested associations are upserted in the same call as the parent, recursively: a child's own nested associations (at any depth) are upserted the same way as the parent's.

This is an upsert-only operation: rows absent from the attrs are left untouched at every level. Unlike Ecto.Changeset.cast_assoc/3's :on_replace behavior, absent children are never deleted or nilified.

  • has_many and has_one: the associated records are upserted into their own table. Each child must include its foreign key in its attrs, since it is upserted directly via insert_all/3.

  • many_to_many: the associated records are upserted into their own table, and the join table rows linking each parent to its associations are upserted as well. Duplicate records and links are removed automatically.

  • embeds_one and embeds_many: embedded data has no table of its own, so it is stored inline on the parent row as part of the parent upsert.

Known limitations

  • Nested belongs_to associations are not upserted. To associate with a belongs_to parent, include its foreign key field in the attrs (e.g. category_id). This applies at every level of nesting.