Bulkinup (Bulkinup v0.6.0)
View SourceBulk inserts and upserts for nested Ecto schemas.
Ecto's Ecto.Repo.insert_all/3 writes one flat table at a time and skips changeset
validation. Bulkinup fills that gap: it validates attrs maps through your schemas' changeset
functions, then writes a parent and its nested associations across multiple tables in one
call, wrapped in a single transaction by default.
Two sibling verbs share the same engine, options, and semantics:
upsert/4— bulk upsert: existing rows are updated on conflict.insert/4— pure bulk insert: writing a row that already exists raises.
use Bulkinup (see __using__/1) injects repo-scoped bulk_insert/3 and bulk_upsert/3
into your repo module, with app-wide defaults declared once at use time.
For a walkthrough, see the guides: Getting Started, Nested Associations, Recipes, and Migrating from bulk_upsert.
Summary
Types
@type options() :: [ changeset_function_atom: atom(), chunk_size: pos_integer(), insert_all_function_module: module(), insert_all_function_atom: atom(), insert_all_opts: %{optional(module() | Ecto.Schema.source()) => Keyword.t()}, max_concurrency: pos_integer(), placeholders: %{ optional(module() | Ecto.Schema.source()) => %{optional(atom()) => term()} }, recover_changeset_errors: %{ optional(module()) => %{optional(atom()) => term()} }, replace_all_except: [atom()], timeout: timeout() ]
Options accepted by insert/4 and upsert/4. See upsert/4's documentation for details.
:replace_all_except is upsert-only: insert/4 raises an ArgumentError when given it.
Map keys typed module() | Ecto.Schema.source() accept a schema module or, for
many_to_many join tables, the source as a string (e.g. "persons_topics").
Functions
Injects bulk_insert/3 and bulk_upsert/3 into the calling repo module — insert/4 and
upsert/4 scoped to that repo, with shared defaults declared once at use time:
defmodule YourProject.Repo do
use Ecto.Repo, otp_app: :your_project, adapter: Ecto.Adapters.Postgres
use Bulkinup,
timeout: fetch_timeout!(),
insert: [chunk_size: 500],
upsert: [replace_all_except: [:inserted_at]]
end
YourProject.Repo.bulk_upsert(Person, attrs_list)
{:ok, %{upserted: 2, skipped: 0}}Defaults and precedence
- A flat key (e.g.
timeout:above) is a shared default, applied to each verb it is valid for. The upsert-only:replace_all_exceptis applied tobulk_upsert/3only — though passing it per-call tobulk_insert/3still raises, as it does forinsert/4. - The
insert:andupsert:keys hold per-verb defaults. - Precedence: per-call opts override the verb namespace, which overrides flat keys.
Option names are validated at compile time: a typo, or a key that is valid for no verb it
would apply to, is a compile error. Option values are injected unevaluated and evaluated
per call at runtime, so dynamic defaults (like fetch_timeout!() above, resolved in the
repo module) work.
@spec insert(module(), module(), Enumerable.t(map()), options()) :: {:ok, %{inserted: non_neg_integer(), skipped: non_neg_integer()}}
Like upsert/4, but a pure bulk insert: rows are only ever created, never updated.
No on_conflict or conflict_target defaults are applied at any level — the parent schema,
nested associations, and many_to_many join tables all use Ecto's default conflict behavior,
so inserting a row (or join table link) that already exists raises (a Postgrex.Error unique
violation on Postgres). By default the entire insert runs in a single transaction, so every
change is rolled back when a duplicate raises.
To tolerate children or join rows shared with data that is already in the database (e.g.
many_to_many records that several parents reference), override the conflict behavior for
just those sources via :insert_all_opts:
iex> Bulkinup.insert(
...> YourProject.Repo,
...> YourProject.Blog.Post,
...> attrs_list,
...> insert_all_opts: %{
...> YourProject.Blog.Tag => [on_conflict: :nothing],
...> "posts_tags" => [on_conflict: :nothing]
...> }
...> )
{:ok, %{inserted: 2, skipped: 0}}Returns {:ok, %{inserted: inserted_count, skipped: skipped_count}}, with the same meaning
as upsert/4's counts.
Everything else — changeset validation, :recover_changeset_errors, :placeholders,
chunking, streaming input, :max_concurrency, :timeout, and the skipped-items summary
logging — behaves exactly as documented for upsert/4. The upsert-only option
:replace_all_except raises an ArgumentError.
@spec upsert(module(), module(), Enumerable.t(map()), options()) :: {:ok, %{upserted: non_neg_integer(), skipped: non_neg_integer()}}
Validate attrs maps (attrs_list) by passing them through an Ecto changeset, then upsert the
valid items to the database that corresponds to a given Ecto repo_module (e.g.
YourProject.Repo).
attrs_list may be any Enumerable — a plain list, or a lazy Stream for large inputs (see
the Streaming section below).
Using a changeset serves two purposes:
- The changeset can be used to validate and transform the data.
- Using a changeset allows this function to perform bulk upserts with nested associations.
For validation, each item in the attrs_list is converted to a changeset for a given
schema_module. The changeset function is called with a single argument (the attrs map), so
the schema module must expose a 1-arity changeset function — e.g. a changeset/2 whose first
argument defaults to an empty struct. By default, this function is called :changeset. (See
the Options section below for more info.)
Basic example
iex> Bulkinup.upsert(
...> YourProject.Repo,
...> YourProject.Persons.Person,
...> _attrs_list = [
...> %{id: 1, name: "Alice", age: 25, phone_number: "555-1234"},
...> %{id: 2, name: "Bob", age: 35, phone_number: "555-2345"},
...> ]
...> )
{:ok, %{upserted: 2, skipped: 0}}Return value
Returns {:ok, %{upserted: upserted_count, skipped: skipped_count}}, where the counts refer to
the top-level attrs: :upserted is the number of items sent to the database, and :skipped is
the number of items dropped because their changesets were invalid. (Skipped items are
summarized in one :warning log per call, with per-item detail at the :debug level.) A
database error raises; by default the entire upsert runs in a single transaction, so every
change is rolled back (with :max_concurrency, only the failing chunk is — see below).
Streaming
attrs_list is consumed lazily, in chunks of :chunk_size items: a Stream is never fully
materialized, so memory stays bounded for arbitrarily large inputs. Plain lists behave
identically (same counts, same single skipped-items summary log). Note that without
:max_concurrency, the single transaction — and any locks it takes — stays open for the
stream's full duration.
Options
Unknown option names raise an ArgumentError, as does passing a Bulkinup option (other than
:timeout or :placeholders, which insert_all/3 also accepts) inside an :insert_all_opts
value.
Warning
The :changeset_function_atom, :insert_all_function_module, and :insert_all_function_atom
options are invoked via apply/3. Never build these option values from untrusted (e.g.
user-supplied) input.
:changeset_function_atom- The name of the changeset function to apply for the givenschema_module. It is called with one argument: the attrs map. (Default::changeset):chunk_size- The number of parent attrs items to insert into the database in a single query. Can be increased or decreased as needed to avoid exceeding the Postgres parameter limit for a single query. (Default:1000):insert_all_function_module- Instead of using the:insert_allfunction in the givenrepo_module, you may specify the name of a custom module to use instead. (Default: Inherited from the value specified in therepo_modulefunction argument, e.g.YourProject.Repo)- Example:
YourProject.OtherRepo
- Example:
:insert_all_function_atom- Instead of using your repo module's:insert_allfunction, you may pass a compatible equivalent that accepts the same arguments. (Default::insert_all)- Example:
:insert_all_with_autogenerated_timestamps
- Example:
:insert_all_opts- Pass customoptsto theinsert_all/3function. This option consists of a map whose key is the schema or source that may have items being upserted, and the value is theYourProject.Repo.insert_all/3opts that will be applied when items for that schema are being upserted. By default, a conflicting row has all of its values replaced except the primary key(s) (see the:replace_all_exceptoption). (Default:%{})- Example:
%{YourProject.Persons.Person => [on_conflict: {:nothing}]} - A
many_to_manyjoin table is keyed by its source, e.g.%{"persons_topics" => [...]}. :conflict_targetdefaults to the schema's primary key, so a schema without a primary key must supply its own:conflict_targethere (otherwise the upsert fails at the database).
- Example:
:max_concurrency- Upsert up to this many chunks of:chunk_sizeparents concurrently (viaTask.async_stream/3), each chunk in its own transaction. By default, all chunks are upserted sequentially inside a single transaction. Setting this option trades the single-transaction guarantee for insert throughput: (Default:nil)- A failing chunk still raises, but chunks that already committed stay committed, so a failure partway through leaves the database with partial results.
:timeoutapplies to each chunk's transaction instead of the whole call.- Concurrent chunks that share
many_to_manychild records may upsert the same related (or join table) rows in different orders, which can deadlock in Postgres. Ensure concurrent input does not share child records across chunks, or be prepared to retry on deadlock. - In the test environment, the Ecto SQL sandbox requires shared mode (or explicit
allowances) so the spawned tasks may use the test's database connection — e.g.
use MyApp.DataCase, async: falsewith a Phoenix-stylesetup_sandbox/1.
:placeholders- Set fields from shared values that are sent to the database once instead of once per row, using the:placeholdersfeature of Ecto'sinsert_all/3. This option is a map whose key is the schema or source being upserted, and the value is a map offield => value. The fields do not need to appear in the attrs. (Default:%{})- Example:
%{YourProject.Persons.Person => %{inserted_at: DateTime.utc_now()}} - Each placeholder value is injected into the attrs before the changeset is built, so a
placeholder field is cast and validated like any other field and may be included in the
changeset's
validate_required/2. - The shared value replaces any per-row value supplied for the field in the attrs.
- Embedded schemas are stored inline on their parent row and are never upserted as their own source, so placeholder values keyed by an embedded schema module are ignored.
- Example:
:recover_changeset_errors- If the given fields in a changeset have errors, then replace them with a custom fallback value. (Default:%{})- Example:
%{YourProject.Persons.Person => %{phone_number: "INVALID"}} - Applies recursively to nested association and embedded changesets, with fallbacks looked up by each changeset's schema (for embeds, the embedded schema module). A parent's association error is cleared once all of that association's child changesets have been recovered.
- A changeset is only recovered if every one of its error fields has a fallback and every nested changeset is recoverable by the same rule; otherwise the row is skipped.
- A fallback value is applied without re-running the changeset function, so it must be valid for the schema.
- Errors on the association and embed fields themselves (e.g. an association whose attrs could not be cast at all) are never recoverable.
- Example:
:replace_all_except- If a row already exists, then all fields will be replaced except the primary key, and any fields specified here. (Default:[])- Example:
[:field, :other_field]
- Example:
:timeout- The maximum timeout for the transaction that wraps the entire bulk upsert (all chunks), also applied to eachinsert_all/3query. With:max_concurrency, the timeout applies to each chunk's transaction instead. (Default:15000)- Example:
60_000
- Example:
Examples
Upsert a list of Person attrs using the changeset function
YourProject.Persons.Person.upsert_changeset/2 to validate the attrs:
iex> attrs_list = [%{id: 1, name: "Alice", ...}]
iex> Bulkinup.upsert(
...> YourProject.Repo,
...> YourProject.Persons.Person,
...> attrs_list,
...> changeset_function_atom: :upsert_changeset
...> )
{:ok, %{upserted: 1, skipped: 0}}Upsert a list of attrs, overwriting only the :name field if there is a conflict. Schemas that
are not given custom :insert_all_opts keep the default conflict behavior (replace all fields
except the primary key):
iex> insert_all_opts = %{
...> YourProject.Persons.Person => [on_conflict: {:replace, [:name]}]
...> }
iex> Bulkinup.upsert(
...> YourProject.Repo,
...> YourProject.Persons.Person,
...> _attrs_list = [%{id: 1, name: "Alicia"}],
...> insert_all_opts: insert_all_opts
...> )
{:ok, %{upserted: 1, skipped: 0}}Associations
Nested associations are upserted in the same call as the parent, recursively: a child's own nested associations (at any depth) are upserted the same way as the parent's.
This is an upsert-only operation: rows absent from the attrs are left untouched at every level.
Unlike Ecto.Changeset.cast_assoc/3's :on_replace behavior, absent children are never
deleted or nilified.
has_manyandhas_one: the associated records are upserted into their own table. Each child must include its foreign key in its attrs, since it is upserted directly viainsert_all/3.many_to_many: the associated records are upserted into their own table, and the join table rows linking each parent to its associations are upserted as well. Duplicate records and links are removed automatically.embeds_oneandembeds_many: embedded data has no table of its own, so it is stored inline on the parent row as part of the parent upsert.
Known limitations
- Nested
belongs_toassociations are not upserted. To associate with abelongs_toparent, include its foreign key field in the attrs (e.g.category_id). This applies at every level of nesting.