Skuld.Effects.Brook (skuld v0.28.0)
View SourceHigh-level streaming API built on channels.
Stream provides combinators for building streaming pipelines with:
- Backpressure via bounded channels
- Automatic error propagation
- Optional concurrency for transformations
- Integration with Skuld effects (batching works!)
Basic Usage
comp do
source <- Brook.from_enum(1..100)
mapped <- Brook.map(source, fn x -> x * 2 end)
filtered <- Brook.filter(mapped, fn x -> rem(x, 4) == 0 end)
Brook.to_list(filtered)
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run()Error Propagation
Errors automatically flow downstream through channels. If an error occurs while processing any value, the stream is immediately errored:
comp do
source <- Brook.from_function(fn ->
case fetch_data() do
{:ok, items} -> {:items, items}
{:error, reason} -> {:error, reason}
end
end)
mapped <- Brook.map(source, &process/1)
Brook.run(mapped, &sink/1)
end
Summary
Functions
Execute a function for each item in the stream.
Filter items in the stream.
Map and flatten: transform_fn returns a list for each item, and
the lists are flattened into a single stream.
Create a stream from an enumerable.
Create a stream from a producer function.
Transform each item in the stream.
Reduce the stream, threading an accumulator through each item.
Run a stream to completion, applying a consumer function to each item.
Collect all items from a stream into a list.
Functions
@spec each( Skuld.Effects.Channel.Handle.t() | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()), (term() -> any()) ) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())
Execute a function for each item in the stream.
Returns :ok when the stream completes successfully, or {:error, reason}
if an error occurred.
Example
comp do
source <- Brook.from_enum(1..10)
Brook.each(source, fn x -> IO.puts("Got: #{x}") end)
end
@spec filter( Skuld.Effects.Channel.Handle.t() | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()), (term() -> boolean()), keyword() ) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())
Filter items in the stream.
Only items for which the predicate returns true pass through.
Options
:buffer- Output channel capacity (default: 10)
Example
comp do
source <- Brook.from_enum(1..20)
evens <- Brook.filter(source, fn x -> rem(x, 2) == 0 end)
Brook.to_list(evens)
end
@spec flat_map( Skuld.Effects.Channel.Handle.t() | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()), (term() -> [term()] | Skuld.Comp.Types.computation([term()])), keyword() ) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())
Map and flatten: transform_fn returns a list for each item, and
the lists are flattened into a single stream.
Uses the same concurrent semaphore pattern as map/2. Supports the
same :concurrency and :buffer options.
Example
[1, 2, 3]
|> Brook.from_enum()
|> Brook.flat_map(fn x -> [x, x * 10] end, concurrency: 2)
|> Brook.to_list()
# => [1, 10, 2, 20, 3, 30]
@spec from_enum( Enumerable.t(), keyword() ) :: Skuld.Comp.Types.computation()
Create a stream from an enumerable.
Spawns a producer fiber that puts items into the output channel, then closes the channel when exhausted.
Options
:buffer- Output channel capacity (default: 10)
Example
comp do
source <- Brook.from_enum(1..100)
Brook.to_list(source)
end
@spec from_function( (-> {:item, term()} | {:items, [term()]} | :done | {:error, term()}), keyword() ) :: Skuld.Comp.Types.computation()
Create a stream from a producer function.
The producer function is called repeatedly until it signals completion. It should return:
{:item, value}- emit a single item{:items, [values]}- emit multiple items:done- close the channel normally{:error, reason}- signal error to consumers
Options
:buffer- Output channel capacity (default: 10)
Example
comp do
counter = Agent.start_link(fn -> 0 end)
source <- Brook.from_function(fn ->
n = Agent.get_and_update(counter, fn n -> {n, n + 1} end)
if n < 10, do: {:item, n}, else: :done
end)
Brook.to_list(source)
end
@spec map( Skuld.Effects.Channel.Handle.t() | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()), (term() -> term() | Skuld.Comp.Types.computation(term())), keyword() ) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())
Transform each item in the stream.
Spawns a worker fiber that reads items from input, applies the transform function to each, and writes results to output.
The transform function can be a pure function or return a computation. If the transform errors, the stream is errored.
Options
:concurrency- Maximum concurrent transforms (default: 1). Controls how many items can be transformed simultaneously. Uses a bounded channel as a semaphore — items acquire a slot before spawning, release when done.:buffer- Output channel capacity (default: 10)
Example
comp do
source <- Brook.from_enum(1..10)
doubled <- Brook.map(source, fn x -> x * 2 end)
Brook.to_list(doubled)
endWith Effects
Transform functions can use effects, enabling batching:
comp do
source <- Brook.from_enum(user_ids)
users <- Brook.map(source, fn id -> DB.fetch(User, id) end, concurrency: 10)
Brook.to_list(users)
end
|> DB.with_executors()
@spec reduce( Skuld.Effects.Channel.Handle.t() | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()), term(), (term(), term() -> term() | Skuld.Comp.Types.computation(term())) ) :: Skuld.Comp.Types.computation(term())
Reduce the stream, threading an accumulator through each item.
The reducer function receives each item and the current accumulator, and returns the new accumulator (or a computation producing it). Returns the final accumulator value.
Sequential by nature — each step depends on the previous result. Batching is limited to within-step query blocks.
Example
comp do
source <- Brook.from_enum(1..10)
total <- Brook.reduce(source, 0, fn item, acc -> acc + item end)
end
@spec run( Skuld.Effects.Channel.Handle.t() | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()), (term() -> term() | Skuld.Comp.Types.computation(term())) ) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())
Run a stream to completion, applying a consumer function to each item.
Similar to each/2 but the consumer function can return a computation.
Returns :ok on success or {:error, reason} on failure.
Example
comp do
source <- Brook.from_enum(records)
Brook.run(source, fn record -> process_record(record) end)
end
@spec to_list( Skuld.Effects.Channel.Handle.t() | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()) ) :: Skuld.Comp.Types.computation([term()])
Collect all items from a stream into a list.
Returns the list on success, or {:error, reason} on failure.
Example
comp do
source <- Brook.from_enum(1..10)
mapped <- Brook.map(source, fn x -> x * 2 end)
Brook.to_list(mapped)
end