Skuld.Effects.FiberPool (skuld v0.27.1)
View SourceEffect for cooperative fiber-based concurrency.
FiberPool provides lightweight cooperative concurrency within a single process. Fibers are scheduled cooperatively - they run until they await, complete, or error.
Basic Usage
comp do
# Run work as a fiber (cooperative, same process)
h1 <- FiberPool.fiber(expensive_computation())
h2 <- FiberPool.fiber(another_computation())
# Do other work while fibers run...
# Await results (raises on error)
r1 <- FiberPool.await!(h1)
r2 <- FiberPool.await!(h2)
{r1, r2}
end
|> FiberPool.with_handler()
|> Comp.run()Structured Concurrency
Use scope/1 to ensure all spawned fibers complete before exiting:
comp do
FiberPool.scope(comp do
h1 <- FiberPool.fiber(work1())
h2 <- FiberPool.fiber(work2())
# Both h1 and h2 will complete (or be cancelled) before scope exits
FiberPool.await_all!([h1, h2])
end)
endComparison with Async
FiberPool replaces the Async effect with a simpler, more focused design:
- Fibers only (no Tasks in this module - see Task integration milestone)
- Simpler state management
- Designed for I/O batching integration
Summary
Functions
Applicative ap — run a function-producing computation and a value-producing
computation concurrently as FiberPool fibers, then apply the function to
the value.
Await a single fiber's result.
Await a single fiber's result, raising on error.
Await all fibers' results.
Await all fibers' results, raising on any error.
Await any fiber's result.
Await any fiber's result, raising on error.
Await a fiber's result with single-consumer semantics.
Cancel a fiber.
Run a computation as a fiber (cooperative, same process).
Spawn each computation as a fiber, returning all handles.
Run a list of computations concurrently as fibers, returning all results in order.
Map a function over items, running each result computation as a fiber, and return all results in order.
Create a structured concurrency scope.
Install the FiberPool handler for a computation.
Functions
@spec ap(Skuld.Comp.Types.computation(), Skuld.Comp.Types.computation()) :: Skuld.Comp.Types.computation()
Applicative ap — run a function-producing computation and a value-producing
computation concurrently as FiberPool fibers, then apply the function to
the value.
This is the standard applicative functor <*> operation. Both computations
are spawned as cooperative fibers within the same FiberPool, so their effects
(including data fetches) land in the same batch round, enabling implicit
concurrency.
How it achieves concurrency
ap runs exactly two computations concurrently: one that produces a
function, and one that produces a value. Both are spawned as fibers and
awaited together, so their data fetches land in the same batch round.
When both complete, the function is applied to the value.
To run more than two operations concurrently, ap is applied repeatedly
(like cons building a list). Each application adds one more concurrent
computation by pairing it with a function that accumulates results:
# Three concurrent fetches via repeated ap:
ap(
ap(
Comp.map(fetch(:x), fn x -> fn y -> [y, x] end end),
fetch(:y)
),
fetch(:z)
)
|> Comp.map(fn [z, y, x] -> {x, y, z} end)Each ap spawns two fibers — the accumulated computation so far (which
returns a function) and the next value computation. The function captures
previous results in a closure and conses the new value onto them. This is
the standard applicative pattern from Haskell's <*>, where liftA2,
liftA3, etc. are built by chaining <*> with fmap.
In practice, the query macro handles this desugaring automatically —
users rarely need to call ap directly.
Requires a FiberPool handler to be installed.
Example
comp_f = Comp.pure(fn x -> x * 2 end)
comp_a = 21
result = FiberPool.ap(comp_f, comp_a)
# result is a computation that returns 42
@spec await(Skuld.Coroutine.Handle.t()) :: Skuld.Comp.Types.computation()
Await a single fiber's result.
Suspends the current fiber until the target fiber completes.
Returns {:ok, value} on success or {:error, reason} on failure.
Example
comp do
h <- FiberPool.fiber(some_work())
result <- FiberPool.await(h)
case result do
{:ok, value} -> # use value
{:error, reason} -> # handle error
end
end
@spec await!(Skuld.Coroutine.Handle.t()) :: Skuld.Comp.Types.computation()
Await a single fiber's result, raising on error.
Suspends the current fiber until the target fiber completes. Returns the result value directly, or raises if the fiber errored.
Example
comp do
h <- FiberPool.fiber(some_work())
value <- FiberPool.await!(h) # raises on error
# use value
end
@spec await_all([Skuld.Coroutine.Handle.t()]) :: Skuld.Comp.Types.computation()
Await all fibers' results.
Suspends until all fibers complete. Returns results in the same order
as the input handles, each as {:ok, value} or {:error, reason}.
@spec await_all!([Skuld.Coroutine.Handle.t()]) :: Skuld.Comp.Types.computation()
Await all fibers' results, raising on any error.
Suspends until all fibers complete. Returns result values in the same order as the input handles. Raises if any fiber errored.
@spec await_any([Skuld.Coroutine.Handle.t()]) :: Skuld.Comp.Types.computation()
Await any fiber's result.
Suspends until at least one fiber completes. Returns {handle, result}
where result is {:ok, value} or {:error, reason}.
@spec await_any!([Skuld.Coroutine.Handle.t()]) :: Skuld.Comp.Types.computation()
Await any fiber's result, raising on error.
Suspends until at least one fiber completes. Returns {handle, value}
for the first fiber to complete. Raises if the fiber errored.
@spec await_consume(Skuld.Coroutine.Handle.t()) :: Skuld.Comp.Types.computation()
Await a fiber's result with single-consumer semantics.
Like await/1, but removes the result from the completed map after retrieval.
Use this when you know the fiber will only be awaited once, to enable
garbage collection of the result.
This is used internally by Channel.take_async for memory-efficient streaming.
@spec cancel(Skuld.Coroutine.Handle.t()) :: Skuld.Comp.Types.computation()
Cancel a fiber.
Marks the fiber for cancellation. If the fiber is suspended, it will be woken with an error. If running, it will be cancelled at the next suspension point.
@spec fiber( Skuld.Comp.Types.computation(), keyword() ) :: Skuld.Comp.Types.computation()
Run a computation as a fiber (cooperative, same process).
Returns a handle that can be used to await the result.
Options
:priority- Fiber priority (future use)
@spec fiber_all([Skuld.Comp.Types.computation()]) :: Skuld.Comp.Types.computation()
Spawn each computation as a fiber, returning all handles.
The handles can be awaited individually or with await_all/1 / await_all!/1.
Example
comp do
handles <- FiberPool.fiber_all([fetch(:x), fetch(:y), fetch(:z)])
FiberPool.await_all!(handles)
end
@spec fiber_await_all([Skuld.Comp.Types.computation()]) :: Skuld.Comp.Types.computation()
Run a list of computations concurrently as fibers, returning all results in order.
Each computation is spawned as a fiber, then all are awaited with await_all!/1.
Raises if any fiber fails.
A single-element list is optimised to skip fiber overhead.
This is the primitive used by the query macro for independent binding groups.
Example
FiberPool.fiber_await_all([fetch(:x), fetch(:y), fetch(:z)])
# returns a computation producing [x_result, y_result, z_result]
@spec map([a], (a -> Skuld.Comp.Types.computation())) :: Skuld.Comp.Types.computation() when a: term()
Map a function over items, running each result computation as a fiber, and return all results in order.
This is a convenience combining Enum.map/2, fiber_all/1, and await_all!/1.
Raises if any fiber fails.
Example
FiberPool.map([1, 2, 3], &Queries.get_user/1)
# returns a computation producing [user1, user2, user3]
@spec scope( Skuld.Comp.Types.computation(), keyword() ) :: Skuld.Comp.Types.computation()
Create a structured concurrency scope.
All fibers submitted within the scope will be awaited before the scope returns. If the scope body completes normally, all fibers are awaited and their results discarded (the scope returns the body's result). If the scope body errors, all fibers are cancelled.
Options
:on_exit- Optional callbackfn result, handles -> computation()called before awaiting fibers. Can be used for custom cleanup.
Example
FiberPool.scope(comp do
h1 <- FiberPool.fiber(work1())
h2 <- FiberPool.fiber(work2())
# Both h1 and h2 will complete before scope exits
FiberPool.await!(h1)
end)
@spec with_handler( Skuld.Comp.Types.computation(), keyword() ) :: Skuld.Comp.Types.computation()
Install the FiberPool handler for a computation.
The handler collects fiber submissions and await operations.
Use run/1 or run!/1 to execute the computation with full
fiber scheduling.