Building Storage Systems with ExCodecs

Copy Markdown View Source
Mix.install([
  {:ex_codecs, path: Path.join(__DIR__, "..")},
  {:kino, "~> 0.14"}
])

Practical Usage Patterns

The Basic Pattern

# Every compression operation in ExCodecs follows this pattern:
data = "Important data to store"

# Encode (compress)
{:ok, compressed} = ExCodecs.encode(:zstd, data)

# Decode (decompress) — always matches the codec used to encode
{:ok, recovered} = ExCodecs.decode(:zstd, compressed)

IO.puts("Round-trip OK: #{recovered == data}")

Choosing a Codec at Runtime

# Let the workload decide
defmodule StorageConfig do
  def codec_for_content_type("application/json"), do: :zstd
  def codec_for_content_type("application/octet-stream"), do: :lz4
  def codec_for_content_type("application/x-array"), do: :blosc2
  def codec_for_content_type(_), do: :zstd
end

content_type = "application/json"
codec = StorageConfig.codec_for_content_type(content_type)
IO.puts("Selected codec: #{inspect(codec)}")
IO.puts("Supported: #{ExCodecs.supports?(codec)}")

Building a Compressed Storage Layer

defmodule CompressedStore do
  @moduledoc """
  A simple key-value store with transparent compression.

  Stores values compressed and decompresses on read.
  Codec and options are configurable per-store instance.
  """

  use Agent

  defstruct [:codec, :opts, :table]

  def start_link(opts \\ []) do
    codec = Keyword.get(opts, :codec, :zstd)
    codec_opts = Keyword.get(opts, :opts, [])
    name = Keyword.get(opts, :name, __MODULE__)

    Agent.start_link(fn ->
      %__MODULE__{
        codec: codec,
        opts: codec_opts,
        table: %{}
      }
    end, name: name)
  end

  def put(store \\ __MODULE__, key, value) do
    Agent.get_and_update(store, fn state ->
      case ExCodecs.encode(state.codec, value, state.opts) do
        {:ok, compressed} ->
          entry = %{
            compressed: compressed,
            original_size: byte_size(value),
            codec: state.codec
          }
          {{:ok, byte_size(compressed)}, Map.put(state.table, key, entry)}

        {:error, err} ->
          {{:error, err}, state}
      end
    end)
  end

  def get(store \\ __MODULE__, key) do
    Agent.get(store, fn state ->
      case Map.get(state.table, key) do
        nil -> {:error, :not_found}
        entry -> ExCodecs.decode(entry.codec, entry.compressed)
      end
    end)
  end

  def info(store \\ __MODULE__) do
    Agent.get(store, fn state ->
      entries = Map.values(state.table)
      total_original = Enum.sum(Enum.map(entries, & &1.original_size))
      total_compressed = Enum.sum(Enum.map(entries, &byte_size(&1.compressed)))

      %{
        codec: state.codec,
        entry_count: length(entries),
        total_original_bytes: total_original,
        total_compressed_bytes: total_compressed,
        savings_pct: Float.round(100 * (1 - total_compressed / max(total_original, 1)), 1)
      }
    end)
  end)

  def delete(store \\ __MODULE__, key) do
    Agent.get_and_update(store, fn state ->
      {Map.get(state.table, key), Map.delete(state.table, key)}
    end)
  end
end

Using the Compressed Store

{:ok, _pid} = CompressedStore.start_link(codec: :zstd, level: 3)

# Store various data types
CompressedStore.put(:doc1, String.duplicate("Hello, World! ", 1000))
CompressedStore.put(:doc2, :crypto.strong_rand_bytes(4096) |> Base.encode64())
CompressedStore.put(:doc3, Jason.encode!(%{for i <- 1..100, do: %{id: i, value: :rand.uniform(1000)}}))

# Note: building JSON manually since we don't require Jason
json_data = Enum.map_join(1..100, ",", fn i ->
  "{\"id\":#{i},\"value\":#{:rand.uniform(1000)}}"
end)
CompressedStore.put(:doc3, "[#{json_data}]")

# Retrieve and verify
{:ok, doc1} = CompressedStore.get(:doc1)
IO.puts("Retrieved doc1 (#{byte_size(doc1)} bytes)")

# Check storage efficiency
info = CompressedStore.info()
IO.puts("\nStorage Summary:")
IO.puts("  Codec: #{inspect(info.codec)}")
IO.puts("  Entries: #{info.entry_count}")
IO.puts("  Original: #{info.total_original_bytes} bytes")
IO.puts("  Compressed: #{info.total_compressed_bytes} bytes")
IO.puts("  Savings: #{info.savings_pct}%")

Multi-Codec Store

defmodule MultiCodecStore do
  @moduledoc """
  A store that selects the best codec per value.

  Tries multiple codecs and picks the one with the best ratio.
  Stores the chosen codec alongside the compressed data.
  """

  use Agent

  @codecs [:lz4, :snappy, :zstd, :bzip2]

  def start_link(opts \\ []) do
    name = Keyword.get(opts, :name, __MODULE__)
    Agent.start_link(fn -> %{} end, name: name)
  end

  def put(store \\ __MODULE__, key, value) do
    Agent.get_and_update(store, fn state ->
      results = for codec <- @codecs do
        {:ok, enc} = ExCodecs.encode(codec, value)
        {codec, enc, byte_size(enc)}
      end)

      {best_codec, best_enc, best_size} =
        Enum.min_by(results, fn {_, _, size} -> size end)

      entry = %{
        codec: best_codec,
        compressed: best_enc,
        original_size: byte_size(value),
        compressed_size: best_size
      }

      {:ok, entry_info} = {entry, Map.put(state, key, entry)}
    end)
  end

  def get(store \\ __MODULE__, key) do
    Agent.get(store, fn state ->
      case Map.get(state, key) do
        nil -> {:error, :not_found}
        entry -> ExCodecs.decode(entry.codec, entry.compressed)
      end
    end)
  end

  def stats(store \\ __MODULE__) do
    Agent.get(store, fn state ->
      state
      |> Map.values()
      |> Enum.group_by(& &1.codec)
      |> Enum.map(fn {codec, entries} ->
        {codec, length(entries),
         Enum.sum(Enum.map(entries, & &1.original_size)),
         Enum.sum(Enum.map(entries, & &1.compressed_size))}
      end)
      |> Enum.sort_by(&elem(&1, 1))
    end)
  end
end

Using the Multi-Codec Store

{:ok, _pid} = MultiCodecStore.start_link()

datasets = %{
  repetitive: String.duplicate("abcabcabc", 5000),
  semi_random: for(_ <- 1..10000, into: <<>>, do: <<:rand.uniform(255)>>),
  floats: for(i <- 1..4096, into: <<>>, do: <<i * 0.1::float-size(64)-little>>)
}

for {name, data} <- datasets do
  {:ok, entry} = MultiCodecStore.put(name, data)
  IO.puts("#{name}: best codec = #{inspect(entry.codec)}, " <>
    "#{entry.original_size} -> #{entry.compressed_size} bytes " <>
    "(#{Float.round(100 * entry.compressed_size / entry.original_size, 1)}%)")
end

IO.puts("\nCodec distribution:")
for {codec, count, orig, comp} <- MultiCodecStore.stats() do
  IO.puts("  #{inspect(codec)}: #{count} entries, " <>
    "#{orig} -> #{comp} bytes")
end

Chunked Compression for Large Files

defmodule ChunkedCompressor do
  @moduledoc """
  Compresses large data in chunks to limit memory usage.

  Each chunk is compressed independently, enabling:
  - Parallel compression
  - Random access on decompression
  - Memory-bounded processing
  """

  @default_chunk_size 64 * 1024  # 64 KB

  def compress(data, codec \\ :zstd, opts \\ [], chunk_size \\ @default_chunk_size) do
    chunks = chunk_binary(data, chunk_size)

    compressed_chunks = Enum.map(chunks, fn chunk ->
      {:ok, enc} = ExCodecs.encode(codec, chunk, opts)
      enc
    end)

    # Header: number of chunks, then offsets
    offsets = compressed_chunks
      |> Enum.scan(0, fn chunk, acc -> acc + byte_size(chunk) end)
      |> Enum.drop(-1)
      |> List.insert_at(0, 0)

    header = <<
      length(compressed_chunks)::unsigned-big-32,
      byte_size(data)::unsigned-big-64
    >>

    offset_data = for offset <- offsets, into: <<>>, do: <<offset::unsigned-big-32>>

    {:ok, header <> offset_data <> IO.iodata_to_binary(compressed_chunks)}
  end

  def decompress(blob, codec \\ :zstd) do
    <<num_chunks::unsigned-big-32, _original_size::unsigned-big-64, rest::binary>> = blob

    offsets_size = num_chunks * 4
    <<offsets::binary-size(offsets_size), chunks_binary::binary>> = rest

    offsets_list = for <<offset::unsigned-big-32 <- offsets>>, do: offset
    offsets_list = offsets_list ++ [byte_size(chunks_binary)]

    chunks = Enum.chunk_every(offsets_list, 2, 1, :discard)
      |> Enum.map(fn {start_off, end_off} ->
        size = end_off - start_off
        <<_::binary-size(start_off), chunk::binary-size(size), _::binary>> = chunks_binary
        chunk
      end)

    decompressed = Enum.map(chunks, fn chunk ->
      {:ok, dec} = ExCodecs.decode(codec, chunk)
      dec
    end)

    {:ok, IO.iodata_to_binary(decompressed)}
  end

  defp chunk_binary(data, chunk_size) do
    for <<chunk::binary-size(chunk_size) <- data>>, do: chunk
  end
end
# Demo: chunked compression
large_data = String.duplicate("This is a chunk of data that repeats. ", 10000)

{:ok, compressed} = ChunkedCompressor.compress(large_data, :zstd)
{:ok, recovered} = ChunkedCompressor.decompress(compressed, :zstd)

IO.puts("Original:   #{byte_size(large_data)} bytes")
IO.puts("Compressed: #{byte_size(compressed)} bytes")
IO.puts("Ratio:      #{Float.round(100 * byte_size(compressed) / byte_size(large_data), 1)}%")
IO.puts("Round-trip: #{recovered == large_data}")

Error Handling Best Practices

# Pattern 1: Always handle both success and error
case ExCodecs.encode(:zstd, my_data) do
  {:ok, compressed} ->
    :ok = write_to_disk(compressed)
    {:ok, compressed}

  {:error, %ExCodecs.Error{reason: :unsupported_codec}} ->
    # Codec not available — fall back or fail gracefully
    {:error, :codec_missing}

  {:error, %ExCodecs.Error{reason: :invalid_data} = err} ->
    # Data is not binary — log and reject
    IO.puts("Invalid data: #{err.message}")
    {:error, :bad_input}

  {:error, %ExCodecs.Error{reason: :compression_failed} = err} ->
    # Compression itself failed — possibly corrupted or too large
    IO.puts("Compression failed: #{err.message}")
    {:error, :compress_failed}
end
# Pattern 2: Validate before compress
def safe_compress(codec, data, opts \\ []) do
  with {:ok, true} <- {:check_codec, ExCodecs.supports?(codec)},
       {:ok, true} <- {:check_binary, is_binary(data)},
       {:ok, compressed} <- ExCodecs.encode(codec, data, opts),
       {:ok, ^data} <- ExCodecs.decode(codec, compressed) do
    {:ok, compressed}
  else
    {:check_codec, false} -> {:error, :unsupported_codec}
    {:check_binary, false} -> {:error, :not_binary}
    {:error, err} -> {:error, err}
  end
end

{:ok, result} = safe_compress(:zstd, "verify round-trip")
IO.puts("Safe compress OK: #{is_binary(result)}")
# Pattern 3: Fallback codec chain
def compress_with_fallback(data, codecs \\ [:zstd, :lz4, :snappy]) do
  codecs
  |> Enum.reduce_while({:error, :no_codec_available}, fn codec, _acc ->
    case ExCodecs.encode(codec, data) do
      {:ok, compressed} -> {:halt, {:ok, {codec, compressed}}}
      {:error, _} -> {:cont, {:error, :codec_failed}}
    end
  end)
end

{:ok, {used_codec, compressed}} = compress_with_fallback("fallback test data")
IO.puts("Used codec: #{inspect(used_codec)}")
IO.puts("Compressed: #{byte_size(compressed)} bytes")

ETS Integration

defmodule CompressedETS do
  @moduledoc """
  ETS-backed storage with transparent compression.

  Stores compressed values in ETS for fast in-memory access
  with reduced memory footprint.
  """

  def new(name \\ :compressed_store) do
    :ets.new(name, [:set, :public, :named_table])
    name
  end

  def insert(table, key, value, codec \\ :zstd, opts \\ []) do
    {:ok, compressed} = ExCodecs.encode(codec, value, opts)
    :ets.insert(table, {key, compressed, codec, byte_size(value)})
    {:ok, byte_size(compressed)}
  end

  def lookup(table, key) do
    case :ets.lookup(table, key) do
      [{^key, compressed, codec, _orig_size}] ->
        ExCodecs.decode(codec, compressed)
      [] ->
        {:error, :not_found}
    end
  end

  def stats(table) do
    entries = :ets.tab2list(table)
    total_orig = Enum.sum(Enum.map(entries, fn {_, _, _, orig} -> orig end))
    total_comp = Enum.sum(Enum.map(entries, fn {_, comp, _, _} -> byte_size(comp) end))

    %{
      entries: length(entries),
      original_bytes: total_orig,
      compressed_bytes: total_comp,
      savings_pct: Float.round(100 * (1 - total_comp / max(total_orig, 1)), 1)
    }
  end
end
table = CompressedETS.new()

for i <- 1..100 do
  data = String.duplicate("item-#{rem(i, 10)} ", 500)
  CompressedETS.insert(table, "key_#{i}", data)
end

stats = CompressedETS.stats(table)
IO.puts("ETS Compressed Store:")
IO.puts("  Entries: #{stats.entries}")
IO.puts("  Original: #{stats.original_bytes} bytes")
IO.puts("  Compressed: #{stats.compressed_bytes} bytes")
IO.puts("  Savings: #{stats.savings_pct}%")

# Verify retrieval
{:ok, val} = CompressedETS.lookup(table, "key_1")
IO.puts("\nRetrieved #{byte_size(val)} bytes successfully")

:ets.delete(table)

File Storage Integration

defmodule CompressedFileIO do
  @moduledoc """
  File I/O with transparent compression.

  Writes compressed data to disk with a codec header for
  self-describing files.
  """

  @magic "EXCD"

  def write(path, data, codec \\ :zstd, opts \\ []) do
    {:ok, compressed} = ExCodecs.encode(codec, data, opts)

    header = <<
      @magic::binary,
      codec_atom_to_bin(codec)::binary,
      byte_size(data)::unsigned-big-64,
      byte_size(compressed)::unsigned-big-64
    >>

    File.write(path, header <> compressed)
  end

  def read(path) do
    case File.read(path) do
      {:ok, <<@magic::binary, codec_bin::binary-size(8), _orig_size::unsigned-big-64, _comp_size::unsigned-big-64, compressed::binary>>} ->
        codec = bin_to_codec_atom(codec_bin)
        ExCodecs.decode(codec, compressed)

      {:ok, _} ->
        {:error, :invalid_format}

      {:error, reason} ->
        {:error, reason}
    end
  end

  defp codec_atom_to_bin(atom) do
    atom |> Atom.to_string() |> String.pad_trailing(8)
  end

  defp bin_to_codec_atom(bin) do
    bin |> String.trim_trailing() |> String.to_atom()
  end
end
# Demo: write and read compressed file
test_data = String.duplicate("Compressed file storage example. ", 2000)
path = Path.join(System.tmp_dir!(), "ex_codecs_demo.dat")

:ok = CompressedFileIO.write(path, test_data, :zstd, level: 9)
{:ok, recovered} = CompressedFileIO.read(path)

IO.puts("Wrote and read #{byte_size(test_data)} bytes")
IO.puts("Round-trip OK: #{recovered == test_data}")
IO.puts("File size: #{File.stat!(path).size} bytes")

File.rm(path)

Key Takeaways

  1. Wrap encode/decode in your own modules to centralize codec choice and error handling
  2. Always handle errors — codecs can fail on invalid input, memory pressure, etc.
  3. Store the codec name alongside compressed data so you know how to decompress
  4. Use ETS for compressed in-memory caches — significant memory savings
  5. Chunk large files — enables random access, parallel processing, and bounded memory
  6. Try multiple codecs for mixed workloads — different data compresses best with different algorithms

Next: Zarr-Style Workloads