ExZarr.Nx.DataLoader (ExZarr v1.1.0)
View SourceEfficient data loading for machine learning training with ExZarr arrays.
This module provides streaming batch loaders optimized for ML training workflows. It handles batching, shuffling, and multi-epoch iteration while maintaining memory efficiency.
Features
- Batch streaming: Load data in fixed-size batches
- Shuffling: Randomize sample order for better training
- Multi-epoch: Iterate over dataset multiple times
- Memory efficient: Only loads required chunks, not entire dataset
- Paired loading: Load features and labels together
- Incomplete batches: Configurable handling of final partial batch
Basic Usage
# Load batches from array
{:ok, array} = ExZarr.open(path: "/data/features")
array
|> ExZarr.Nx.DataLoader.batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
# Train on batch
train_step(model, batch)
end)Shuffled Training
# Shuffle samples for better convergence
array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
train_step(model, batch)
end)Multi-Epoch Training
# Train for multiple epochs
for epoch <- 1..10 do
array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32, seed: epoch)
|> Enum.each(fn {:ok, batch} ->
train_step(model, batch)
end)
endLoading Features and Labels
{:ok, features} = ExZarr.open(path: "/data/X")
{:ok, labels} = ExZarr.open(path: "/data/y")
ExZarr.Nx.DataLoader.paired_batch_stream(features, labels, 32)
|> Enum.each(fn {:ok, {X_batch, y_batch}} ->
train_step(model, X_batch, y_batch)
end)
Summary
Types
Options for batch streaming.
Functions
Streams batches from ExZarr array for ML training.
Counts the number of batches that will be produced.
Streams paired batches from feature and label arrays.
Streams paired shuffled batches from feature and label arrays.
Streams shuffled batches from ExZarr array for ML training.
Types
@type batch_option() :: {:drop_remainder, boolean()} | {:seed, integer()} | {:shuffle_buffer_size, pos_integer()} | {:backend, module()} | {:names, [atom()]}
Options for batch streaming.
:drop_remainder- Drop incomplete final batch (default: false):seed- Random seed for shuffling (default: nil, uses random):shuffle_buffer_size- Size of shuffle buffer (default: batch_size * 10):backend- Nx backend to transfer tensors to (default: nil):names- Axis names for tensors (default: nil)
Functions
@spec batch_stream(ExZarr.Array.t(), pos_integer(), [batch_option()]) :: Enumerable.t({:ok, Nx.Tensor.t()} | {:error, term()})
Streams batches from ExZarr array for ML training.
Loads data in fixed-size batches, efficiently reading only required chunks.
The stream yields {:ok, batch_tensor} for each batch.
Options
:drop_remainder- If true, drops final batch if smaller than batch_size (default: false):backend- Nx backend to transfer tensors to (default: nil):names- Axis names for tensors (default: nil)
Returns
Stream yielding {:ok, Nx.Tensor.t()} or {:error, term()} for each batch.
Examples
# Basic batch streaming
{:ok, array} = ExZarr.open(path: "/data/training")
array
|> ExZarr.Nx.DataLoader.batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
IO.inspect(Nx.shape(batch)) # {32, ...}
end)
# Drop incomplete final batch
array
|> ExZarr.Nx.DataLoader.batch_stream(32, drop_remainder: true)
|> Enum.to_list()
# Transfer to GPU backend
array
|> ExZarr.Nx.DataLoader.batch_stream(32, backend: EXLA.Backend)
|> Enum.to_list()
@spec count_batches(ExZarr.Array.t(), pos_integer(), [batch_option()]) :: non_neg_integer()
Counts the number of batches that will be produced.
Useful for progress bars and validation.
Examples
{:ok, array} = ExZarr.open(path: "/data/training")
num_batches = ExZarr.Nx.DataLoader.count_batches(array, 32)
# => 32 (for 1000 samples with batch_size 32)
# With drop_remainder
num_batches = ExZarr.Nx.DataLoader.count_batches(array, 32, drop_remainder: true)
# => 31 (drops final incomplete batch)
@spec paired_batch_stream(ExZarr.Array.t(), ExZarr.Array.t(), pos_integer(), [ batch_option() ]) :: Enumerable.t({:ok, {Nx.Tensor.t(), Nx.Tensor.t()}} | {:error, term()})
Streams paired batches from feature and label arrays.
Loads corresponding batches from features (X) and labels (y) arrays together, ensuring they stay aligned. Both arrays must have the same number of samples (first dimension).
Options
Same as batch_stream/3, applied to both arrays.
Returns
Stream yielding {:ok, {X_batch, y_batch}} or {:error, term()} for each batch pair.
Examples
# Load features and labels together
{:ok, X} = ExZarr.open(path: "/data/features")
{:ok, y} = ExZarr.open(path: "/data/labels")
ExZarr.Nx.DataLoader.paired_batch_stream(X, y, 32)
|> Enum.each(fn {:ok, {X_batch, y_batch}} ->
train_step(model, X_batch, y_batch)
end)
# With shuffling
ExZarr.Nx.DataLoader.paired_shuffled_batch_stream(X, y, 32)
|> Enum.each(fn {:ok, {X_batch, y_batch}} ->
train_step(model, X_batch, y_batch)
end)
@spec paired_shuffled_batch_stream( ExZarr.Array.t(), ExZarr.Array.t(), pos_integer(), [batch_option()] ) :: Enumerable.t({:ok, {Nx.Tensor.t(), Nx.Tensor.t()}} | {:error, term()})
Streams paired shuffled batches from feature and label arrays.
Like paired_batch_stream/4, but shuffles samples before batching.
Both arrays are shuffled using the same index order to maintain alignment.
Options
Same as shuffled_batch_stream/3, applied to both arrays.
Returns
Stream yielding {:ok, {X_batch, y_batch}} or {:error, term()} for each shuffled batch pair.
Examples
{:ok, X} = ExZarr.open(path: "/data/features")
{:ok, y} = ExZarr.open(path: "/data/labels")
# Shuffled training
for epoch <- 1..10 do
ExZarr.Nx.DataLoader.paired_shuffled_batch_stream(X, y, 32, seed: epoch)
|> Enum.each(fn {:ok, {X_batch, y_batch}} ->
train_step(model, X_batch, y_batch)
end)
end
@spec shuffled_batch_stream(ExZarr.Array.t(), pos_integer(), [batch_option()]) :: Enumerable.t({:ok, Nx.Tensor.t()} | {:error, term()})
Streams shuffled batches from ExZarr array for ML training.
Randomizes sample order before batching, which typically improves training convergence. Uses a shuffle buffer to balance memory usage and randomization quality.
Shuffling Strategy
Uses reservoir sampling with a configurable buffer size:
- Larger buffer: Better randomization, more memory
- Smaller buffer: Less memory, local randomization
- Default: 10x batch size
Options
:seed- Random seed for reproducibility (default: random):shuffle_buffer_size- Buffer size for shuffling (default: batch_size * 10):drop_remainder- Drop incomplete final batch (default: false):backend- Nx backend to transfer tensors to (default: nil):names- Axis names for tensors (default: nil)
Returns
Stream yielding {:ok, Nx.Tensor.t()} or {:error, term()} for each shuffled batch.
Examples
# Shuffled training
{:ok, array} = ExZarr.open(path: "/data/training")
array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32)
|> Enum.each(fn {:ok, batch} ->
train_step(model, batch)
end)
# Reproducible shuffling
for epoch <- 1..10 do
array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32, seed: epoch)
|> Enum.each(fn {:ok, batch} ->
train_step(model, batch)
end)
end
# Large shuffle buffer for better randomization
array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(32, shuffle_buffer_size: 1000)
|> Enum.to_list()