ExZarr.Flow (ExZarr v1.1.0)
View SourceFlow integration for parallel, backpressure-aware Zarr chunk processing.
Flow partitions chunk streams across schedulers and applies backpressure
automatically. Use this when processing very large arrays where you need
controlled parallelism beyond Task.async_stream/3.
Flow is an optional dependency. Add it to your mix.exs:
{:flow, "~> 1.2"}Examples
array
|> ExZarr.Flow.chunk_flow()
|> Flow.map(fn {index, data} -> {index, byte_size(data)} end)
|> Enum.to_list()
array
|> ExZarr.Flow.chunk_flow(stages: 8, ordered: false)
|> Flow.filter(fn {_index, data} -> byte_size(data) > 0 end)
|> Enum.sum()Partitioning
Flow uses Flow.from_enumerable/2 with stages: schedulers by default.
Chunk reads use concurrency: 1 because Flow stages provide parallelism.
Tune throughput with :stages, not :concurrency.
Summary
Functions
Creates a Flow from an array's chunks.
Creates a Flow from array slices along a dimension.
Functions
@spec chunk_flow( ExZarr.Array.t(), keyword() ) :: Flow.t()
Creates a Flow from an array's chunks.
Options
:stages- Number of Flow stages (default:System.schedulers_online/0)- Other
stream_chunks/2options (:ordered,:metadata,:filter, etc.) except:concurrency, which is fixed at 1 for Flow pipelines
Examples
array
|> ExZarr.Flow.chunk_flow(stages: 4)
|> Flow.map(&process_chunk/1)
|> Flow.reduce(fn -> 0 end, fn {_i, data}, acc -> acc + byte_size(data) end)
|> Flow.emit(:state)
|> Enum.to_list()
@spec slice_flow(ExZarr.Array.t(), non_neg_integer(), keyword()) :: Flow.t()
Creates a Flow from array slices along a dimension.
Chunk reads use concurrency: 1; tune parallelism with :stages.
Examples
array
|> ExZarr.Flow.slice_flow(0, stages: 4)
|> Flow.map(fn {_start, data} -> byte_size(data) end)
|> Enum.to_list()