Building Storage Systems with ExCodecs

Copy Markdown View Source
local_path = Path.join(__DIR__, "../mix.exs")

ex_codecs_dep =
  if File.exists?(local_path) do
    [{:ex_codecs, path: Path.join(__DIR__, "..")}, {:rustler, "~> 0.36"}]
  else
    [{:ex_codecs, "~> 0.1"}]
  end

config =
  if File.exists?(local_path) do
    [rustler_precompiled: [force_build: [ex_codecs: true]]]
  else
    []
  end

Mix.install(
  ex_codecs_dep ++ [{:jason, "~> 1.4"}, {:kino, "~> 0.14"}, {:kino_vega_lite, "~> 0.1.13"}],
  config: config
)

Practical Usage Patterns

The Basic Pattern

data = "Important data to store"

{:ok, compressed} = ExCodecs.encode(:zstd, data)
{:ok, recovered} = ExCodecs.decode(:zstd, compressed)

IO.puts("Round-trip OK: #{recovered == data}")

Choosing a Codec at Runtime

defmodule StorageConfig do
  def codec_for_content_type("application/json"), do: :zstd
  def codec_for_content_type("application/octet-stream"), do: :lz4
  def codec_for_content_type("application/x-array"), do: :blosc2
  def codec_for_content_type(_), do: :zstd
end

content_type = "application/json"
codec = StorageConfig.codec_for_content_type(content_type)
IO.puts("Selected codec: #{inspect(codec)}")
IO.puts("Supported: #{ExCodecs.supports?(codec)}")

Building a Compressed Storage Layer

defmodule CompressedStore do
  @moduledoc """
  A simple key-value store with transparent compression.
  """
  use Agent

  defstruct [:codec, :opts, :table]

  def start_link(opts \\ []) do
    codec = Keyword.get(opts, :codec, :zstd)
    codec_opts = Keyword.get(opts, :opts, [])
    name = Keyword.get(opts, :name, __MODULE__)

    Agent.start_link(fn ->
      %__MODULE__{codec: codec, opts: codec_opts, table: %{}}
    end, name: name)
  end

  def put(store \\ __MODULE__, key, value) do
    Agent.get_and_update(store, fn state ->
      case ExCodecs.encode(state.codec, value, state.opts) do
        {:ok, compressed} ->
          entry = %{
            compressed: compressed,
            original_size: byte_size(value),
            codec: state.codec
          }
          {{:ok, byte_size(compressed)}, %{state | table: Map.put(state.table, key, entry)}}
        {:error, err} ->
          {{:error, err}, state}
      end
    end)
  end

  def get(store \\ __MODULE__, key) do
    Agent.get(store, fn state ->
      case Map.get(state.table, key) do
        nil -> {:error, :not_found}
        entry -> ExCodecs.decode(entry.codec, entry.compressed)
      end
    end)
  end

  def info(store \\ __MODULE__) do
    Agent.get(store, fn state ->
      entries = Map.values(state.table)
      total_original = Enum.sum(Enum.map(entries, & &1.original_size))
      total_compressed = Enum.sum(Enum.map(entries, &byte_size(&1.compressed)))
      %{
        codec: state.codec,
        entry_count: length(entries),
        total_original_bytes: total_original,
        total_compressed_bytes: total_compressed,
        savings_pct: Float.round(100 * (1 - total_compressed / max(total_original, 1)), 1)
      }
    end)
  end

  def delete(store \\ __MODULE__, key) do
    Agent.get_and_update(store, fn state ->
      {Map.get(state.table, key), %{state | table: Map.delete(state.table, key)}}
    end)
  end
end

Using the Compressed Store

 {:ok, _pid} = CompressedStore.start_link(codec: :zstd, level: 3)

CompressedStore.put(:doc1, String.duplicate("Hello, World! ", 1000))
CompressedStore.put(:doc2, :crypto.strong_rand_bytes(4096) |> Base.encode64())
CompressedStore.put(:doc3, Jason.encode!(for i <- 1..100, do: %{id: i, value: :rand.uniform(1000)}))

{:ok, doc1} = CompressedStore.get(:doc1)
IO.puts("Retrieved doc1 (#{byte_size(doc1)} bytes)")

info = CompressedStore.info()
IO.puts("\nStorage Summary:")
IO.puts("  Codec: #{inspect(info.codec)}")
IO.puts("  Entries: #{info.entry_count}")
IO.puts("  Original: #{info.total_original_bytes} bytes")
IO.puts("  Compressed: #{info.total_compressed_bytes} bytes")
IO.puts("  Savings: #{info.savings_pct}%")

Multi-Codec Store

defmodule MultiCodecStore do
  @moduledoc """
  A store that selects the best codec per value.
  """
  use Agent

  @codecs [:lz4, :snappy, :zstd, :bzip2]

  def start_link(opts \\ []) do
    name = Keyword.get(opts, :name, __MODULE__)
    Agent.start_link(fn -> %{} end, name: name)
  end

  def put(store \\ __MODULE__, key, value) do
    Agent.get_and_update(store, fn state ->
      results = for codec <- @codecs do
        {:ok, enc} = ExCodecs.encode(codec, value)
        {codec, enc, byte_size(enc)}
      end

      {best_codec, best_enc, best_size} =
        Enum.min_by(results, fn {_, _, size} -> size end)

      entry = %{
        codec: best_codec,
        compressed: best_enc,
        original_size: byte_size(value),
        compressed_size: best_size
      }

      {{:ok, entry}, Map.put(state, key, entry)}
    end)
  end

  def get(store \\ __MODULE__, key) do
    Agent.get(store, fn state ->
      case Map.get(state, key) do
        nil -> {:error, :not_found}
        entry -> ExCodecs.decode(entry.codec, entry.compressed)
      end
    end)
  end

  def stats(store \\ __MODULE__) do
    Agent.get(store, fn state ->
      state
      |> Map.values()
      |> Enum.group_by(& &1.codec)
      |> Enum.map(fn {codec, entries} ->
        {codec, length(entries),
         Enum.sum(Enum.map(entries, & &1.original_size)),
         Enum.sum(Enum.map(entries, & &1.compressed_size))}
      end)
      |> Enum.sort_by(&elem(&1, 1))
    end)
  end
end

Using the Multi-Codec Store

{:ok, _pid} = MultiCodecStore.start_link()

datasets = %{
  repetitive: String.duplicate("abcabcabc", 5000),
  semi_random: (for _ <- 1..10000, into: <<>>, do: <<:rand.uniform(255)>>),
  floats: (for i <- 1..4096, into: <<>>, do: <<i * 0.1::float-size(64)-little>>)
}

for {name, data} <- datasets do
  {:ok, entry} = MultiCodecStore.put(name, data)
  IO.puts("#{name}: best codec = #{inspect(entry.codec)}, " <>
    "#{entry.original_size} -> #{entry.compressed_size} bytes " <>
    "(#{Float.round(100 * entry.compressed_size / entry.original_size, 1)}%)")
end

IO.puts("\nCodec distribution:")
for {codec, count, orig, comp} <- MultiCodecStore.stats() do
  IO.puts("  #{inspect(codec)}: #{count} entries, #{orig} -> #{comp} bytes")
end

Chunked Compression for Large Files

defmodule ChunkedCompressor do
  @moduledoc """
  Compresses large data in chunks to limit memory usage.
  """
  @default_chunk_size 64 * 1024

  def compress(data, codec \\ :zstd, opts \\ [], chunk_size \\ @default_chunk_size) do
    chunks = for <<chunk::binary-size(chunk_size) <- data>>, do: chunk

    compressed_chunks = Enum.map(chunks, fn chunk ->
      {:ok, enc} = ExCodecs.encode(codec, chunk, opts)
      enc
    end)

    offsets = compressed_chunks
      |> Enum.scan(0, fn chunk, acc -> acc + byte_size(chunk) end)
      |> Enum.drop(-1)
      |> List.insert_at(0, 0)

    header = <<
      length(compressed_chunks)::unsigned-big-32,
      byte_size(data)::unsigned-big-64
    >>

    offset_data = for offset <- offsets, into: <<>>, do: <<offset::unsigned-big-32>>

    {:ok, header <> offset_data <> IO.iodata_to_binary(compressed_chunks)}
  end

  def decompress(blob, codec \\ :zstd) do
    <<num_chunks::unsigned-big-32, _original_size::unsigned-big-64, rest::binary>> = blob

    offsets_size = num_chunks * 4
    <<offsets::binary-size(offsets_size), chunks_binary::binary>> = rest

    offsets_list = for <<offset::unsigned-big-32 <- offsets>>, do: offset
    offsets_list = offsets_list ++ [byte_size(chunks_binary)]

    chunks = Enum.chunk_every(offsets_list, 2, 1, :discard)
      |> Enum.map(fn [start_off, end_off] ->
        size = end_off - start_off
        <<_::binary-size(start_off), chunk::binary-size(size), _::binary>> = chunks_binary
        chunk
      end)

    decompressed = Enum.map(chunks, fn chunk ->
      {:ok, dec} = ExCodecs.decode(codec, chunk)
      dec
    end)

    {:ok, IO.iodata_to_binary(decompressed)}
  end
end
large_data = String.duplicate("This is a chunk of data that repeats. ", 10000)

{:ok, compressed} = ChunkedCompressor.compress(large_data, :zstd)
{:ok, recovered} = ChunkedCompressor.decompress(compressed, :zstd)

IO.puts("Original:   #{byte_size(large_data)} bytes")
IO.puts("Compressed: #{byte_size(compressed)} bytes")
IO.puts("Ratio:      #{Float.round(100 * byte_size(compressed) / byte_size(large_data), 1)}%")
IO.puts("Round-trip: #{recovered == large_data}")

Error Handling Best Practices

case ExCodecs.encode(:zstd, "some data") do
  {:ok, compressed} ->
    {:ok, compressed}

  {:error, %ExCodecs.Error{reason: :unsupported_codec}} ->
    {:error, :codec_missing}

  {:error, %ExCodecs.Error{reason: :invalid_data} = err} ->
    IO.puts("Invalid data: #{err.message}")
    {:error, :bad_input}

  {:error, %ExCodecs.Error{reason: :compression_failed} = err} ->
    IO.puts("Compression failed: #{err.message}")
    {:error, :compress_failed}
end
defmodule SafeCompress do
  def call(codec, data, opts \\ []) do
    with {:check_codec, true} <- {:check_codec, ExCodecs.supports?(codec)},
         {:check_binary, true} <- {:check_binary, is_binary(data)},
         {:ok, compressed} <- ExCodecs.encode(codec, data, opts),
         {:ok, ^data} <- ExCodecs.decode(codec, compressed) do
      {:ok, compressed}
    else
      {:check_codec, false} -> {:error, :unsupported_codec}
      {:check_binary, false} -> {:error, :not_binary}
      {:error, err} -> {:error, err}
    end
  end
end

{:ok, result} = SafeCompress.call(:zstd, "verify round-trip")
IO.puts("Safe compress OK: #{is_binary(result)}")
defmodule CompressWithFallback do
  def call(data, codecs \\ [:zstd, :lz4, :snappy]) do
    codecs
    |> Enum.reduce_while({:error, :no_codec_available}, fn codec, _acc ->
      case ExCodecs.encode(codec, data) do
        {:ok, compressed} -> {:halt, {:ok, {codec, compressed}}}
        {:error, _} -> {:cont, {:error, :codec_failed}}
      end
    end)
  end
end

{:ok, {used_codec, compressed}} = CompressWithFallback.call("fallback test data")
IO.puts("Used codec: #{inspect(used_codec)}")
IO.puts("Compressed: #{byte_size(compressed)} bytes")

ETS Integration

defmodule CompressedETS do
  @moduledoc """
  ETS-backed storage with transparent compression.
  """
  def new(name \\ :compressed_store) do
    :ets.new(name, [:set, :public, :named_table])
    name
  end

  def insert(table, key, value, codec \\ :zstd, opts \\ []) do
    {:ok, compressed} = ExCodecs.encode(codec, value, opts)
    :ets.insert(table, {key, compressed, codec, byte_size(value)})
    {:ok, byte_size(compressed)}
  end

  def lookup(table, key) do
    case :ets.lookup(table, key) do
      [{^key, compressed, codec, _orig_size}] ->
        ExCodecs.decode(codec, compressed)
      [] ->
        {:error, :not_found}
    end
  end

  def stats(table) do
    entries = :ets.tab2list(table)
    total_orig = Enum.sum(Enum.map(entries, fn {_, _, _, orig} -> orig end))
    total_comp = Enum.sum(Enum.map(entries, fn {_, comp, _, _} -> byte_size(comp) end))
    %{
      entries: length(entries),
      original_bytes: total_orig,
      compressed_bytes: total_comp,
      savings_pct: Float.round(100 * (1 - total_comp / max(total_orig, 1)), 1)
    }
  end
end
table = CompressedETS.new()

for i <- 1..100 do
  data = String.duplicate("item-#{rem(i, 10)} ", 500)
  CompressedETS.insert(table, "key_#{i}", data)
end

stats = CompressedETS.stats(table)
IO.puts("ETS Compressed Store:")
IO.puts("  Entries: #{stats.entries}")
IO.puts("  Original: #{stats.original_bytes} bytes")
IO.puts("  Compressed: #{stats.compressed_bytes} bytes")
IO.puts("  Savings: #{stats.savings_pct}%")

{:ok, val} = CompressedETS.lookup(table, "key_1")
IO.puts("\nRetrieved #{byte_size(val)} bytes successfully")

:ets.delete(table)

File Storage Integration

defmodule CompressedFileIO do
  @moduledoc """
  File I/O with transparent compression.
  """
  @magic "EXCD"

  def write(path, data, codec \\ :zstd, opts \\ []) do
    {:ok, compressed} = ExCodecs.encode(codec, data, opts)
    header = <<
      @magic::binary,
      codec_atom_to_bin(codec)::binary,
      byte_size(data)::unsigned-big-64,
      byte_size(compressed)::unsigned-big-64
    >>
    File.write(path, header <> compressed)
  end

  def read(path) do
    case File.read(path) do
      {:ok, <<@magic::binary, codec_bin::binary-size(8), _orig_size::unsigned-big-64, _comp_size::unsigned-big-64, compressed::binary>>} ->
        codec = bin_to_codec_atom(codec_bin)
        ExCodecs.decode(codec, compressed)
      {:ok, _} ->
        {:error, :invalid_format}
      {:error, reason} ->
        {:error, reason}
    end
  end

  defp codec_atom_to_bin(atom) do
    atom |> Atom.to_string() |> String.pad_trailing(8)
  end

  defp bin_to_codec_atom(bin) do
    bin |> String.trim_trailing() |> String.to_atom()
  end
end
test_data = String.duplicate("Compressed file storage example. ", 2000)
path = Path.join(System.tmp_dir!(), "ex_codecs_demo.dat")

:ok = CompressedFileIO.write(path, test_data, :zstd, level: 9)
{:ok, recovered} = CompressedFileIO.read(path)

IO.puts("Wrote and read #{byte_size(test_data)} bytes")
IO.puts("Round-trip OK: #{recovered == test_data}")
IO.puts("File size: #{File.stat!(path).size} bytes")

File.rm(path)

Key Takeaways

  1. Wrap encode/decode in your own modules to centralize codec choice and error handling
  2. Always handle errors — codecs can fail on invalid input, memory pressure, etc.
  3. Store the codec name alongside compressed data so you know how to decompress
  4. Use ETS for compressed in-memory caches — significant memory savings
  5. Chunk large files — enables random access, parallel processing, and bounded memory
  6. Try multiple codecs for mixed workloads — different data compresses best with different algorithms

Next: Zarr-Style Workloads