BulkUpsert

View Source

Warning

This is a very early release. It works but has some rough edges, and shouldn't be considered production-ready for most use cases.

Upsert multiple Ecto schema structs, along with their nested associations, to the database with a single function call.

Unlike a plain insert_all/3, this package passes each list of attrs through Ecto changesets. This lets it validate your data and upsert a parent and its children across multiple tables in one call.

Supported features:

  • Nested associations: upsert a parent and its has_many, has_one, and many_to_many associations across multiple tables from a single list of attrs (embedded schemas are stored inline on the parent)
  • Validation and data processing (via Ecto changesets)
  • Custom values for autogenerated fields (e.g. insert/update timestamps)

For more information, see this project's documentation.


Getting started

Installation

Add this package to your list of dependencies in mix.exs, then run mix deps.get:

def deps do
  [
    {:bulk_upsert, "0.2.0"}
  ]
end

Usage

After the package has been installed, you may call BulkUpsert.bulk_upsert/4 function directly, or create a wrapper function to use in your context modules:

lib/your_project/repo.ex

defmodule YourProject.Repo do
  use Ecto.Repo,
    otp_app: :your_project,
    adapter: Ecto.Adapters.Postgres

  @doc "Wraps `BulkUpsert.bulk_upsert/4`."
  def bulk_upsert(schema_module, attrs_list, opts \\ []),
    do: BulkUpsert.bulk_upsert(__MODULE__, schema_module, attrs_list, opts)
end

Basic working example

Here is a contrived migration and schema that we can work with:

priv/repo/migrations/0001_create_persons.exs

defmodule YourProject.Repo.Migrations.CreatePersons do
  use Ecto.Migration

  def change do
    create table(:persons) do
      add :name, :string
    end
  end
end

lib/your_project/persons/person.ex

defmodule YourProject.Persons.Person do
  use Ecto.Schema
  import Ecto.Changeset

  schema "persons" do
    field :name, :string
  end

  def changeset(person \\ %__MODULE__{}, attrs) do
    person
    |> cast(attrs, [:id, :name])
    |> validate_required([:id, :name])
  end
end

Now, after running the migrations with mix ecto.reset, we can enter an IEx shell with iex -S mix and make sure everything works:

Interactive Elixir (1.18.3) - press Ctrl+C to exit (type h() ENTER for help)

iex> YourProject.Repo.bulk_upsert(
...>   YourProject.Persons.Person,
...>   [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
...> )
:ok

iex> YourProject.Repo.all(YourProject.Persons.Person)
[
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 1,
    name: "Alice"
  },
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 2,
    name: "Bob"
  }
]

iex> YourProject.Repo.bulk_upsert(
...>   YourProject.Persons.Person,
...>   [%{id: 1, name: "Alicia"}, %{id: 2, name: "Bobby"}]
...> )
:ok

iex> YourProject.Repo.all(YourProject.Persons.Person)
[
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 1,
    name: "Alicia"
  },
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 2,
    name: "Bobby"
  }
]

Working with nested associations

The main reason to reach for this package over a plain insert_all/3 is that it can upsert a parent and its children at the same time, from a single list of attrs. The parent and each association are upserted into their own tables, all within one transaction.

Here we extend the Person example with a has_many :pets association:

priv/repo/migrations/0002_create_pets.exs

defmodule YourProject.Repo.Migrations.CreatePets do
  use Ecto.Migration

  def change do
    create table(:pets) do
      add :person_id, references(:persons)
      add :name, :string
    end
  end
end

lib/your_project/persons/pet.ex

defmodule YourProject.Persons.Pet do
  use Ecto.Schema
  import Ecto.Changeset

  schema "pets" do
    field :person_id, :integer
    field :name, :string
  end

  def changeset(pet \\ %__MODULE__{}, attrs) do
    pet
    |> cast(attrs, [:id, :person_id, :name])
    |> validate_required([:id, :person_id, :name])
  end
end

lib/your_project/persons/person.ex

defmodule YourProject.Persons.Person do
  use Ecto.Schema
  import Ecto.Changeset

  schema "persons" do
    field :name, :string

    has_many :pets, YourProject.Persons.Pet
  end

  def changeset(person \\ %__MODULE__{}, attrs) do
    person
    |> cast(attrs, [:id, :name])
    |> validate_required([:id, :name])
    |> cast_assoc(:pets)
  end
end

Note

Each child's foreign key (here, person_id) must be present in its own attrs. Associations are upserted via insert_all/3, so the foreign key is not inferred from the parent.

Now a single call upserts both the persons and their pets across both tables:

iex> YourProject.Repo.bulk_upsert(
...>   YourProject.Persons.Person,
...>   [
...>     %{id: 1, name: "Alice", pets: [
...>       %{id: 10, person_id: 1, name: "Rex"},
...>       %{id: 11, person_id: 1, name: "Whiskers"}
...>     ]},
...>     %{id: 2, name: "Bob", pets: [
...>       %{id: 20, person_id: 2, name: "Buddy"}
...>     ]}
...>   ]
...> )
:ok

iex> YourProject.Repo.all(YourProject.Persons.Pet)
[
  %YourProject.Persons.Pet{id: 10, person_id: 1, name: "Rex"},
  %YourProject.Persons.Pet{id: 11, person_id: 1, name: "Whiskers"},
  %YourProject.Persons.Pet{id: 20, person_id: 2, name: "Buddy"}
]

Running the same call again with changed pet names upserts the existing rows in place, exactly like the top-level structs.

has_one and many_to_many associations work the same way: cast them in the changeset and include them in the attrs. For has_many and has_one, each child must carry its own foreign key (as shown above with person_id). For many_to_many, the associated records and the join table rows are both upserted for you, and duplicate records and links are removed automatically. Embedded schemas (embeds_one, embeds_many) have no table of their own, so they are stored inline on the parent row.

Working with autogenerated timestamps

Ecto's built-in insert_all/3 function does not support autogenerated fields such as timestamps. Therefore, if your project has Ecto schemas that use autogenerated timestamp fields, you will need to ensure that these values are present during the bulk upsert process.

The simplest way is the :placeholders option, which sets fields from shared values (sent to the database once) after changeset validation:

YourProject.Repo.bulk_upsert(
  YourProject.Persons.Person,
  [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}],
  placeholders: %{
    YourProject.Persons.Person => %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()}
  }
)

Placeholder fields bypass the changeset, so they are not cast or validated. Do not mark a placeholder field as required in your changeset (its value is absent during validation, which would mark the row invalid and skip it).

If you need more control, you can instead provide custom logic via a custom insert_all function. Here is an example that shows how this can be accomplished:

lib/your_project/repo.ex

defmodule YourProject.Repo do
  use Ecto.Repo,
    otp_app: :your_project,
    adapter: Ecto.Adapters.Postgres

  require Logger

  @doc "Wraps `BulkUpsert.bulk_upsert/4`."
  def bulk_upsert(schema_module, attrs_list, opts \\ []) do
    base_opts = [
      # Use a custom function that accepts the same arguments as `insert_all/3`
      insert_all_function_atom: :insert_all_with_autogenerated_timestamps,
      # Do not overwrite the initial insert timestamp
      replace_all_except: [:inserted_at]
    ]

    opts = Keyword.merge(base_opts, opts)

    BulkUpsert.bulk_upsert(__MODULE__, schema_module, attrs_list, opts)
  end

  @doc """
  Extend `YourProject.Repo.insert_all/3` to automatically generate current insert and update
  timestamps when performing bulk insert operations.

  > #### Info {: .info}
  >
  > Unlike `YourProject.Repo.insert_all/3`, this function will only accept a schema as the first
  > argument (not a source), and a list of entries as the second argument (not a query).

  ## Examples

      iex> YourProject.Repo.insert_all_with_autogenerated_timestamps(
      ...>   YourProject.Persons.Person,
      ...>   _attrs_list = [%{...}, %{...}]
      ...> )
      {2, nil}
  """
  def insert_all_with_autogenerated_timestamps(schema_module, entries, opts \\ []) do
    placeholders = Keyword.get(opts, :placeholders, %{})

    # Build timestamp placeholders and attrs
    inserted_at_field = :inserted_at
    updated_at_field = :updated_at

    current_timestamp = DateTime.utc_now()

    timestamp_placeholders =
      Map.new([{inserted_at_field, current_timestamp}, {updated_at_field, current_timestamp}])
      # Reject timestamp fields that are not defined in the given `schema_module`
      |> Map.reject(fn {field, _value} -> schema_module.__schema__(:type, field) |> is_nil() end)

    if Enum.empty?(timestamp_placeholders) do
      Logger.debug("""
      The #{inspect(schema_module)} schema does not use any configured insert or update \
      timestamp fields. Falling back to `#{inspect(__MODULE__)}.insert_all/3`...\
      """)

      __MODULE__.insert_all(schema_module, entries, opts)
    else
      Logger.debug("Performing bulk insert with autogenerated timestamps...")

      timestamp_placeholder_attrs =
        timestamp_placeholders
        |> Map.new(fn {field, _value} -> {field, {:placeholder, field}} end)

      # Merge the timestamp attrs and placeholders into an `insert_all/3` function call
      entries = entries |> Enum.map(fn attrs -> Map.merge(attrs, timestamp_placeholder_attrs) end)

      placeholders = placeholders |> Map.merge(timestamp_placeholders)
      opts = opts |> Keyword.put(:placeholders, placeholders)

      __MODULE__.insert_all(schema_module, entries, opts)
    end
  end
end

priv/repo/migrations/0001_create_persons.exs

defmodule YourProject.Repo.Migrations.CreatePersons do
  use Ecto.Migration

  def change do
    create table(:persons) do
      add :name, :string

      timestamps(type: :utc_datetime_usec)
    end
  end
end

lib/your_project/persons/person.ex

defmodule YourProject.Persons.Person do
  use Ecto.Schema
  import Ecto.Changeset

  schema "persons" do
    field :name, :string

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(person \\ %__MODULE__{}, attrs) do
    person
    |> cast(attrs, [:id, :name])
    |> validate_required([:id, :name])
  end

end

Now, after running the migrations with mix ecto.reset, we can enter an IEx shell with iex -S mix and make sure everything still works:

Interactive Elixir (1.18.3) - press Ctrl+C to exit (type h() ENTER for help)

iex> YourProject.Repo.bulk_upsert(
...>   YourProject.Persons.Person,
...>   [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
...> )
:ok

iex> YourProject.Repo.all(YourProject.Persons.Person)
[
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 1,
    name: "Alice",
    inserted_at: ~U[2025-04-29 07:01:10.180490Z],
    updated_at: ~U[2025-04-29 07:01:10.180490Z]
  },
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 2,
    name: "Bob",
    inserted_at: ~U[2025-04-29 07:01:10.180490Z],
    updated_at: ~U[2025-04-29 07:01:10.180490Z]
  }
]

iex> YourProject.Repo.bulk_upsert(
...>   YourProject.Persons.Person,
...>   [%{id: 1, name: "Alicia"}, %{id: 2, name: "Bobby"}]
...> )
:ok

iex> YourProject.Repo.all(YourProject.Persons.Person)
[
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 1,
    name: "Alicia",
    inserted_at: ~U[2025-04-29 07:01:10.180490Z],
    updated_at: ~U[2025-04-29 07:01:19.549929Z]
  },
  %YourProject.Persons.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
    id: 2,
    name: "Bobby",
    inserted_at: ~U[2025-04-29 07:01:10.180490Z],
    updated_at: ~U[2025-04-29 07:01:19.549929Z]
  }
]

As you can see, our new custom logic ensures that the correct timestamps are generated.


For more information, see this project's documentation on HexDocs.


This project made possible by Interline Travel and Tour Inc.

https://www.perx.com/

https://www.touchdown.co.uk/

https://www.touchdownfrance.com/