Machine Learning Pipelines with ExZarr

View Source

Streaming Tensors from Chunks

{:ok, array} = ExZarr.open(path: "/data/training_features")

batches =
  array
  |> ExZarr.Array.stream_chunks(concurrency: 4)
  |> Stream.map(fn {_index, data} -> Nx.from_binary(data, {:f, 32}) end)
  |> Stream.chunk_every(32)
  |> Enum.to_list()

Using DataLoader

For sample-level batching with shuffle:

{:ok, array} = ExZarr.open(path: "/data/features")

array
|> ExZarr.Nx.DataLoader.shuffled_batch_stream(64, seed: 42)
|> Enum.each(fn {:ok, batch} -> train_step(model, batch) end)

Broadway Pipeline

For production training data preparation:

defmodule MyApp.TrainingPipeline do
  use Broadway

  def start_link(array) do
    ExZarr.Broadway.start_chunk_pipeline(__MODULE__, array,
      concurrency: System.schedulers_online(),
      stream_opts: [ordered: false]
    )
  end

  @impl Broadway
  def handle_message(_processor, %{data: {_index, data}} = message, _ctx) do
    tensor = Nx.from_binary(data, {:f, 32}) |> normalize()
    Broadway.Message.put_data(message, tensor)
  end
end