ExZarr.Flow (ExZarr v1.1.0)

View Source

Flow integration for parallel, backpressure-aware Zarr chunk processing.

Flow partitions chunk streams across schedulers and applies backpressure automatically. Use this when processing very large arrays where you need controlled parallelism beyond Task.async_stream/3.

Flow is an optional dependency. Add it to your mix.exs:

{:flow, "~> 1.2"}

Examples

array
|> ExZarr.Flow.chunk_flow()
|> Flow.map(fn {index, data} -> {index, byte_size(data)} end)
|> Enum.to_list()

array
|> ExZarr.Flow.chunk_flow(stages: 8, ordered: false)
|> Flow.filter(fn {_index, data} -> byte_size(data) > 0 end)
|> Enum.sum()

Partitioning

Flow uses Flow.from_enumerable/2 with stages: schedulers by default. Chunk reads use concurrency: 1 because Flow stages provide parallelism. Tune throughput with :stages, not :concurrency.

Summary

Functions

Creates a Flow from an array's chunks.

Creates a Flow from array slices along a dimension.

Functions

chunk_flow(array, opts \\ [])

@spec chunk_flow(
  ExZarr.Array.t(),
  keyword()
) :: Flow.t()

Creates a Flow from an array's chunks.

Options

  • :stages - Number of Flow stages (default: System.schedulers_online/0)
  • Other stream_chunks/2 options (:ordered, :metadata, :filter, etc.) except :concurrency, which is fixed at 1 for Flow pipelines

Examples

array
|> ExZarr.Flow.chunk_flow(stages: 4)
|> Flow.map(&process_chunk/1)
|> Flow.reduce(fn -> 0 end, fn {_i, data}, acc -> acc + byte_size(data) end)
|> Flow.emit(:state)
|> Enum.to_list()

slice_flow(array, along, opts \\ [])

@spec slice_flow(ExZarr.Array.t(), non_neg_integer(), keyword()) :: Flow.t()

Creates a Flow from array slices along a dimension.

Chunk reads use concurrency: 1; tune parallelism with :stages.

Examples

array
|> ExZarr.Flow.slice_flow(0, stages: 4)
|> Flow.map(fn {_start, data} -> byte_size(data) end)
|> Enum.to_list()