ExZarr.Nx.DataLoader (ExZarr v1.1.0)

View Source

Efficient data loading for machine learning training with ExZarr arrays.

This module provides streaming batch loaders optimized for ML training workflows. It handles batching, shuffling, and multi-epoch iteration while maintaining memory efficiency.

Features

  • Batch streaming: Load data in fixed-size batches
  • Shuffling: Randomize sample order for better training
  • Multi-epoch: Iterate over dataset multiple times
  • Memory efficient: Only loads required chunks, not entire dataset
  • Paired loading: Load features and labels together
  • Incomplete batches: Configurable handling of final partial batch

Basic Usage

# Load batches from array
{:ok, array} = ExZarr.open(path: "/data/features")

array
|> ExZarr.Nx.DataLoader.batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
  # Train on batch
  train_step(model, batch)
end)

Shuffled Training

# Shuffle samples for better convergence
array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
  train_step(model, batch)
end)

Multi-Epoch Training

# Train for multiple epochs
for epoch <- 1..10 do
  array
  |> ExZarr.Nx.DataLoader.shuffled_batch_stream(32, seed: epoch)
  |> Enum.each(fn {:ok, batch} ->
    train_step(model, batch)
  end)
end

Loading Features and Labels

{:ok, features} = ExZarr.open(path: "/data/X")
{:ok, labels} = ExZarr.open(path: "/data/y")

ExZarr.Nx.DataLoader.paired_batch_stream(features, labels, 32)
|> Enum.each(fn {:ok, {X_batch, y_batch}} ->
  train_step(model, X_batch, y_batch)
end)

Summary

Types

Options for batch streaming.

Functions

Streams batches from ExZarr array for ML training.

Counts the number of batches that will be produced.

Streams paired batches from feature and label arrays.

Streams paired shuffled batches from feature and label arrays.

Streams shuffled batches from ExZarr array for ML training.

Types

batch_option()

@type batch_option() ::
  {:drop_remainder, boolean()}
  | {:seed, integer()}
  | {:shuffle_buffer_size, pos_integer()}
  | {:backend, module()}
  | {:names, [atom()]}

Options for batch streaming.

  • :drop_remainder - Drop incomplete final batch (default: false)
  • :seed - Random seed for shuffling (default: nil, uses random)
  • :shuffle_buffer_size - Size of shuffle buffer (default: batch_size * 10)
  • :backend - Nx backend to transfer tensors to (default: nil)
  • :names - Axis names for tensors (default: nil)

Functions

batch_stream(array, batch_size, opts \\ [])

@spec batch_stream(ExZarr.Array.t(), pos_integer(), [batch_option()]) ::
  Enumerable.t({:ok, Nx.Tensor.t()} | {:error, term()})

Streams batches from ExZarr array for ML training.

Loads data in fixed-size batches, efficiently reading only required chunks. The stream yields {:ok, batch_tensor} for each batch.

Options

  • :drop_remainder - If true, drops final batch if smaller than batch_size (default: false)
  • :backend - Nx backend to transfer tensors to (default: nil)
  • :names - Axis names for tensors (default: nil)

Returns

Stream yielding {:ok, Nx.Tensor.t()} or {:error, term()} for each batch.

Examples

# Basic batch streaming
{:ok, array} = ExZarr.open(path: "/data/training")

array
|> ExZarr.Nx.DataLoader.batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
  IO.inspect(Nx.shape(batch))  # {32, ...}
end)

# Drop incomplete final batch
array
|> ExZarr.Nx.DataLoader.batch_stream(32, drop_remainder: true)
|> Enum.to_list()

# Transfer to GPU backend
array
|> ExZarr.Nx.DataLoader.batch_stream(32, backend: EXLA.Backend)
|> Enum.to_list()

count_batches(array, batch_size, opts \\ [])

@spec count_batches(ExZarr.Array.t(), pos_integer(), [batch_option()]) ::
  non_neg_integer()

Counts the number of batches that will be produced.

Useful for progress bars and validation.

Examples

{:ok, array} = ExZarr.open(path: "/data/training")
num_batches = ExZarr.Nx.DataLoader.count_batches(array, 32)
# => 32 (for 1000 samples with batch_size 32)

# With drop_remainder
num_batches = ExZarr.Nx.DataLoader.count_batches(array, 32, drop_remainder: true)
# => 31 (drops final incomplete batch)

paired_batch_stream(features, labels, batch_size, opts \\ [])

@spec paired_batch_stream(ExZarr.Array.t(), ExZarr.Array.t(), pos_integer(), [
  batch_option()
]) ::
  Enumerable.t({:ok, {Nx.Tensor.t(), Nx.Tensor.t()}} | {:error, term()})

Streams paired batches from feature and label arrays.

Loads corresponding batches from features (X) and labels (y) arrays together, ensuring they stay aligned. Both arrays must have the same number of samples (first dimension).

Options

Same as batch_stream/3, applied to both arrays.

Returns

Stream yielding {:ok, {X_batch, y_batch}} or {:error, term()} for each batch pair.

Examples

# Load features and labels together
{:ok, X} = ExZarr.open(path: "/data/features")
{:ok, y} = ExZarr.open(path: "/data/labels")

ExZarr.Nx.DataLoader.paired_batch_stream(X, y, 32)
|> Enum.each(fn {:ok, {X_batch, y_batch}} ->
  train_step(model, X_batch, y_batch)
end)

# With shuffling
ExZarr.Nx.DataLoader.paired_shuffled_batch_stream(X, y, 32)
|> Enum.each(fn {:ok, {X_batch, y_batch}} ->
  train_step(model, X_batch, y_batch)
end)

paired_shuffled_batch_stream(features, labels, batch_size, opts \\ [])

@spec paired_shuffled_batch_stream(
  ExZarr.Array.t(),
  ExZarr.Array.t(),
  pos_integer(),
  [batch_option()]
) ::
  Enumerable.t({:ok, {Nx.Tensor.t(), Nx.Tensor.t()}} | {:error, term()})

Streams paired shuffled batches from feature and label arrays.

Like paired_batch_stream/4, but shuffles samples before batching. Both arrays are shuffled using the same index order to maintain alignment.

Options

Same as shuffled_batch_stream/3, applied to both arrays.

Returns

Stream yielding {:ok, {X_batch, y_batch}} or {:error, term()} for each shuffled batch pair.

Examples

{:ok, X} = ExZarr.open(path: "/data/features")
{:ok, y} = ExZarr.open(path: "/data/labels")

# Shuffled training
for epoch <- 1..10 do
  ExZarr.Nx.DataLoader.paired_shuffled_batch_stream(X, y, 32, seed: epoch)
  |> Enum.each(fn {:ok, {X_batch, y_batch}} ->
    train_step(model, X_batch, y_batch)
  end)
end

shuffled_batch_stream(array, batch_size, opts \\ [])

@spec shuffled_batch_stream(ExZarr.Array.t(), pos_integer(), [batch_option()]) ::
  Enumerable.t({:ok, Nx.Tensor.t()} | {:error, term()})

Streams shuffled batches from ExZarr array for ML training.

Randomizes sample order before batching, which typically improves training convergence. Uses a shuffle buffer to balance memory usage and randomization quality.

Shuffling Strategy

Uses reservoir sampling with a configurable buffer size:

  • Larger buffer: Better randomization, more memory
  • Smaller buffer: Less memory, local randomization
  • Default: 10x batch size

Options

  • :seed - Random seed for reproducibility (default: random)
  • :shuffle_buffer_size - Buffer size for shuffling (default: batch_size * 10)
  • :drop_remainder - Drop incomplete final batch (default: false)
  • :backend - Nx backend to transfer tensors to (default: nil)
  • :names - Axis names for tensors (default: nil)

Returns

Stream yielding {:ok, Nx.Tensor.t()} or {:error, term()} for each shuffled batch.

Examples

# Shuffled training
{:ok, array} = ExZarr.open(path: "/data/training")

array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
  train_step(model, batch)
end)

# Reproducible shuffling
for epoch <- 1..10 do
  array
  |> ExZarr.Nx.DataLoader.shuffled_batch_stream(32, seed: epoch)
  |> Enum.each(fn {:ok, batch} ->
    train_step(model, batch)
  end)
end

# Large shuffle buffer for better randomization
array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32, shuffle_buffer_size: 1000)
|> Enum.to_list()