Skuld.Effects.Brook (skuld v0.27.3)

View Source

High-level streaming API built on channels.

Stream provides combinators for building streaming pipelines with:

  • Backpressure via bounded channels
  • Automatic error propagation
  • Optional concurrency for transformations
  • Integration with Skuld effects (batching works!)

Basic Usage

comp do
  source <- Brook.from_enum(1..100)
  mapped <- Brook.map(source, fn x -> x * 2 end)
  filtered <- Brook.filter(mapped, fn x -> rem(x, 4) == 0 end)
  Brook.to_list(filtered)
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run()

Error Propagation

Errors automatically flow downstream through channels. If an error occurs while processing any value, the stream is immediately errored:

comp do
  source <- Brook.from_function(fn ->
    case fetch_data() do
      {:ok, items} -> {:items, items}
      {:error, reason} -> {:error, reason}
    end
  end)

  mapped <- Brook.map(source, &process/1)
  Brook.run(mapped, &sink/1)
end

Summary

Functions

Execute a function for each item in the stream.

Filter items in the stream.

Map and flatten: transform_fn returns a list for each item, and the lists are flattened into a single stream.

Create a stream from an enumerable.

Create a stream from a producer function.

Transform each item in the stream.

Reduce the stream, threading an accumulator through each item.

Run a stream to completion, applying a consumer function to each item.

Collect all items from a stream into a list.

Functions

each(input, consumer_fn)

@spec each(
  Skuld.Effects.Channel.Handle.t()
  | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()),
  (term() -> any())
) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())

Execute a function for each item in the stream.

Returns :ok when the stream completes successfully, or {:error, reason} if an error occurred.

Example

comp do
  source <- Brook.from_enum(1..10)
  Brook.each(source, fn x -> IO.puts("Got: #{x}") end)
end

filter(input, pred_fn, opts \\ [])

@spec filter(
  Skuld.Effects.Channel.Handle.t()
  | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()),
  (term() -> boolean()),
  keyword()
) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())

Filter items in the stream.

Only items for which the predicate returns true pass through.

Options

  • :buffer - Output channel capacity (default: 10)

Example

comp do
  source <- Brook.from_enum(1..20)
  evens <- Brook.filter(source, fn x -> rem(x, 2) == 0 end)
  Brook.to_list(evens)
end

flat_map(input, transform_fn, opts \\ [])

@spec flat_map(
  Skuld.Effects.Channel.Handle.t()
  | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()),
  (term() -> [term()] | Skuld.Comp.Types.computation([term()])),
  keyword()
) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())

Map and flatten: transform_fn returns a list for each item, and the lists are flattened into a single stream.

Uses the same concurrent semaphore pattern as map/2. Supports the same :concurrency and :buffer options.

Example

[1, 2, 3]
|> Brook.from_enum()
|> Brook.flat_map(fn x -> [x, x * 10] end, concurrency: 2)
|> Brook.to_list()
# => [1, 10, 2, 20, 3, 30]

from_enum(enumerable, opts \\ [])

@spec from_enum(
  Enumerable.t(),
  keyword()
) :: Skuld.Comp.Types.computation()

Create a stream from an enumerable.

Spawns a producer fiber that puts items into the output channel, then closes the channel when exhausted.

Options

  • :buffer - Output channel capacity (default: 10)

Example

comp do
  source <- Brook.from_enum(1..100)
  Brook.to_list(source)
end

from_function(producer_fn, opts \\ [])

@spec from_function(
  (-> {:item, term()} | {:items, [term()]} | :done | {:error, term()}),
  keyword()
) :: Skuld.Comp.Types.computation()

Create a stream from a producer function.

The producer function is called repeatedly until it signals completion. It should return:

  • {:item, value} - emit a single item
  • {:items, [values]} - emit multiple items
  • :done - close the channel normally
  • {:error, reason} - signal error to consumers

Options

  • :buffer - Output channel capacity (default: 10)

Example

comp do
  counter = Agent.start_link(fn -> 0 end)

  source <- Brook.from_function(fn ->
    n = Agent.get_and_update(counter, fn n -> {n, n + 1} end)
    if n < 10, do: {:item, n}, else: :done
  end)

  Brook.to_list(source)
end

map(input, transform_fn, opts \\ [])

@spec map(
  Skuld.Effects.Channel.Handle.t()
  | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()),
  (term() -> term() | Skuld.Comp.Types.computation(term())),
  keyword()
) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())

Transform each item in the stream.

Spawns a worker fiber that reads items from input, applies the transform function to each, and writes results to output.

The transform function can be a pure function or return a computation. If the transform errors, the stream is errored.

Options

  • :concurrency - Maximum concurrent transforms (default: 1). Controls how many items can be transformed simultaneously. Uses a bounded channel as a semaphore — items acquire a slot before spawning, release when done.
  • :buffer - Output channel capacity (default: 10)

Example

comp do
  source <- Brook.from_enum(1..10)
  doubled <- Brook.map(source, fn x -> x * 2 end)
  Brook.to_list(doubled)
end

With Effects

Transform functions can use effects, enabling batching:

comp do
  source <- Brook.from_enum(user_ids)
  users <- Brook.map(source, fn id -> DB.fetch(User, id) end, concurrency: 10)
  Brook.to_list(users)
end
|> DB.with_executors()

reduce(input, initial_acc, reducer_fn)

@spec reduce(
  Skuld.Effects.Channel.Handle.t()
  | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()),
  term(),
  (term(), term() -> term() | Skuld.Comp.Types.computation(term()))
) :: Skuld.Comp.Types.computation(term())

Reduce the stream, threading an accumulator through each item.

The reducer function receives each item and the current accumulator, and returns the new accumulator (or a computation producing it). Returns the final accumulator value.

Sequential by nature — each step depends on the previous result. Batching is limited to within-step query blocks.

Example

comp do
  source <- Brook.from_enum(1..10)
  total <- Brook.reduce(source, 0, fn item, acc -> acc + item end)
end

run(input, consumer_fn)

@spec run(
  Skuld.Effects.Channel.Handle.t()
  | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t()),
  (term() -> term() | Skuld.Comp.Types.computation(term()))
) :: Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())

Run a stream to completion, applying a consumer function to each item.

Similar to each/2 but the consumer function can return a computation. Returns :ok on success or {:error, reason} on failure.

Example

comp do
  source <- Brook.from_enum(records)
  Brook.run(source, fn record -> process_record(record) end)
end

to_list(input)

@spec to_list(
  Skuld.Effects.Channel.Handle.t()
  | Skuld.Comp.Types.computation(Skuld.Effects.Channel.Handle.t())
) :: Skuld.Comp.Types.computation([term()])

Collect all items from a stream into a list.

Returns the list on success, or {:error, reason} on failure.

Example

comp do
  source <- Brook.from_enum(1..10)
  mapped <- Brook.map(source, fn x -> x * 2 end)
  Brook.to_list(mapped)
end