Skuld.Effects.Brook (skuld v0.27.2)
View SourceHigh-level streaming API built on channels.
Stream provides combinators for building streaming pipelines with:
- Backpressure via bounded channels
- Automatic error propagation
- Optional concurrency for transformations
- Integration with Skuld effects (batching works!)
Basic Usage
comp do
source <- Brook.from_enum(1..100)
mapped <- Brook.map(source, fn x -> x * 2 end)
filtered <- Brook.filter(mapped, fn x -> rem(x, 4) == 0 end)
Brook.to_list(filtered)
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run()Error Propagation
Errors automatically flow downstream through channels. If an error occurs while processing any value, the stream is immediately errored:
comp do
source <- Brook.from_function(fn ->
case fetch_data() do
{:ok, items} -> {:items, items}
{:error, reason} -> {:error, reason}
end
end)
mapped <- Brook.map(source, &process/1)
Brook.run(mapped, &sink/1)
end
Summary
Functions
Execute a function for each item in the stream.
Filter items in the stream.
Create a stream from an enumerable.
Create a stream from a producer function.
Transform each item in the stream.
Reduce the stream, threading an accumulator through each item.
Run a stream to completion, applying a consumer function to each item.
Collect all items from a stream into a list.
Functions
@spec each(Skuld.Effects.Channel.Handle.t(), (term() -> any())) :: Skuld.Comp.Types.computation()
Execute a function for each item in the stream.
Returns :ok when the stream completes successfully, or {:error, reason}
if an error occurred.
Example
comp do
source <- Brook.from_enum(1..10)
Brook.each(source, fn x -> IO.puts("Got: #{x}") end)
end
@spec filter(Skuld.Effects.Channel.Handle.t(), (term() -> boolean()), keyword()) :: Skuld.Comp.Types.computation()
Filter items in the stream.
Only items for which the predicate returns true pass through.
Options
:buffer- Output channel capacity (default: 10)
Example
comp do
source <- Brook.from_enum(1..20)
evens <- Brook.filter(source, fn x -> rem(x, 2) == 0 end)
Brook.to_list(evens)
end
@spec from_enum( Enumerable.t(), keyword() ) :: Skuld.Comp.Types.computation()
Create a stream from an enumerable.
Spawns a producer fiber that puts items into the output channel, then closes the channel when exhausted.
Options
:buffer- Output channel capacity (default: 10)
Example
comp do
source <- Brook.from_enum(1..100)
Brook.to_list(source)
end
@spec from_function( (-> {:item, term()} | {:items, [term()]} | :done | {:error, term()}), keyword() ) :: Skuld.Comp.Types.computation()
Create a stream from a producer function.
The producer function is called repeatedly until it signals completion. It should return:
{:item, value}- emit a single item{:items, [values]}- emit multiple items:done- close the channel normally{:error, reason}- signal error to consumers
Options
:buffer- Output channel capacity (default: 10)
Example
comp do
counter = Agent.start_link(fn -> 0 end)
source <- Brook.from_function(fn ->
n = Agent.get_and_update(counter, fn n -> {n, n + 1} end)
if n < 10, do: {:item, n}, else: :done
end)
Brook.to_list(source)
end
@spec map( Skuld.Effects.Channel.Handle.t(), (term() -> term() | Skuld.Comp.Types.computation()), keyword() ) :: Skuld.Comp.Types.computation()
Transform each item in the stream.
Spawns a worker fiber that reads items from input, applies the transform function to each, and writes results to output.
The transform function can be a pure function or return a computation. If the transform errors, the stream is errored.
Options
:concurrency- Maximum concurrent transforms (default: 1). Controls how many items can be transformed simultaneously. Uses a bounded channel as a semaphore — items acquire a slot before spawning, release when done.:buffer- Output channel capacity (default: 10)
Example
comp do
source <- Brook.from_enum(1..10)
doubled <- Brook.map(source, fn x -> x * 2 end)
Brook.to_list(doubled)
endWith Effects
Transform functions can use effects, enabling batching:
comp do
source <- Brook.from_enum(user_ids)
users <- Brook.map(source, fn id -> DB.fetch(User, id) end, concurrency: 10)
Brook.to_list(users)
end
|> DB.with_executors()
@spec reduce(Skuld.Effects.Channel.Handle.t(), acc, (term(), acc -> acc | Skuld.Comp.Types.computation())) :: Skuld.Comp.Types.computation() when acc: term()
Reduce the stream, threading an accumulator through each item.
The reducer function receives each item and the current accumulator, and returns the new accumulator (or a computation producing it). Returns the final accumulator value.
Sequential by nature — each step depends on the previous result. Batching is limited to within-step query blocks.
Example
comp do
source <- Brook.from_enum(1..10)
total <- Brook.reduce(source, 0, fn item, acc -> acc + item end)
end
@spec run(Skuld.Effects.Channel.Handle.t(), (term() -> term() | Skuld.Comp.Types.computation())) :: Skuld.Comp.Types.computation()
Run a stream to completion, applying a consumer function to each item.
Similar to each/2 but the consumer function can return a computation.
Returns :ok on success or {:error, reason} on failure.
Example
comp do
source <- Brook.from_enum(records)
Brook.run(source, fn record -> process_record(record) end)
end
@spec to_list(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Collect all items from a stream into a list.
Returns the list on success, or {:error, reason} on failure.
Example
comp do
source <- Brook.from_enum(1..10)
mapped <- Brook.map(source, fn x -> x * 2 end)
Brook.to_list(mapped)
end