aqueduct
Main module of the aqueduct library. Contains all of the type definitions and public
functions for operating on them.
The primary use cases of this library are creating Streams
that emit successive values by executing some side effect (such as reading a file,
consuming a message, awaiting user input, etc), and wrapping them in successive
operations through map, filter,
filter_map, which are only executed whenever the internal
step function of a Stream is called.
It is extremely similar to the gleam_yielder library,
but the one advantage this library offers over the yielder, is that the fundamental data
types provided (Stream and Step) are not
opaque, and thus the user can freely extend the provided functionality by simply
implementing their own functions that operate on Streams.
Introduction
To start, you first need to create a Stream:
aqueduct.single("element")
And then do something with it:
assert aqueduct.next(stream) == Next(Stream(String), "element")
You can also transform the values:
stream
|> aqueduct.map(int.parse)
|> aqueduct.collect()
// -> [Error(Nil)]
and filter them:
stream
|> aqueduct.filter_map(int.parse)
|> aqueduct.collect()
// -> []
Note: Take care when filtering infinite
Streams, as theStreamproduced from filtering traverses the originalStreamelement by element, trying to find the next one that passes the specified predicate. If none of the elements in an infiniteStreampass, trying to get the next element will never terminate.
Cutting apart a given input
For the feature I’m proudest of in this package, take a
look at from_divider.
It takes in a source and a function that cuts that source into smaller pieces that are
then emitted as elements by the resulting Stream.
In mesv, I used this function to create a
Stream(String) of rows in a file by splitting a String on newlines that were not
somewhere between two ‘escapers’.
Specifically, the code of the function was thus:
aqueduct.from_divider(
source,
util.take_until_unescaped(row_separator, not_in: escaper),
)
And the code of the function used to split the source was:
// util.gleam
pub fn take_until_unescaped(
separator el: String,
not_in escaper: String,
) -> fn(String) -> Result(#(String, String), String) {
fn(source: String) {
take_until_unescaped_loop(source, el, escaper, None)
|> result.map(pair.swap)
|> result.map_error(fn(_) { source })
}
}
fn take_until_unescaped_loop(
from: String,
separator: String,
esc: String,
acc: Option(String),
) -> Result(#(String, String), Nil) {
case string.split_once(from, on: separator) {
Ok(#(head, rest)) -> {
let value = case acc {
Some(str) -> str <> separator <> head
None -> head
}
case count_non_overlapping(in: value, of: esc) % 2 == 0 {
True -> Ok(#(value, rest))
False ->
take_until_unescaped_loop(
rest,
separator,
esc,
Some(value),
)
}
}
Error(Nil) -> Error(Nil)
}
}
For specifics on this function’s behaviour, look at its’ documentation.
Types
Result of calling next on a Stream.
It either returns an element and the updated Stream, or Done,
indicating the Stream is finished.
It’s generally emitted by the next function or by simply calling
the internal step function after deconstructing a Stream, but there’s nothing stopping
you from creating it yourself.
pub type Step(a) {
Next(Stream(a), a)
Done
}
Constructors
-
Next(Stream(a), a)This
Streamemitted the below element, and returned the successiveStream. -
DoneThis
Streamhas terminated.
Represents a lazily evaluated Stream of some type of data.
It does not care specifically how the next element is obtained - it merely encapsulates
a function that when called, produces a Step.
What this function does internally depends on how the Stream was defined.
Furthermore, a Stream can be finite or infinite, and currently there’s no good way to
check whether a given Stream terminates.
To create it, either use one of the provided constructors:
Or simply build it by yourself.
pub type Stream(a) {
Stream(fn() -> Step(a))
}
Constructors
-
Stream(fn() -> Step(a))Create a
Streamfrom scratch by passing in a function that returns aStep.Most useful to combine with recursive functions that call themselves to construct the next
Stream.
Values
pub fn append(stream: Stream(a), element: a) -> Stream(a)
Add the provided element to the end of the Stream - it will be the last element shown.
Note
If the stream is infinite, this element will never be returned, since this function only
waits until the stream returns Done, and then replaces that Done
with Next(empty(), element).
So if the Stream never returns Done, that element will never be returned.
pub fn collect(stream: Stream(a)) -> List(a)
Consume the provided Stream and collect all of the values into a List.
Note
As this function attempts to eagerly evaluate all of the elements until it encounters the
Done next step, if called on an infinite Stream, it will never terminate.
pub fn collect_count(stream: Stream(a), count: Int) -> List(a)
Consume the provided Stream and collect count number of values into a List.
Since this function has a built in limit, as long as all of the elements in the Stream
can be evaluated and terminate, it will also terminate, even if the Stream is infinite.
Basically, using this function protects you against the infinite length of the Stream,
but cannot protect you against potentially infinite requirements of the internal
function of the Stream.
Such a Stream with infinite requirements of the internal function would be created
by something like this:
aqueduct.from_iterator(fn(num) { num + 2 }, 1)
|> aqueduct.filter(fn(num) { num % 2 == 0 })
This creates a Stream that emits odd numbers, while filtering all non-even numbers out.
Trying to call next on this Stream would never terminate
(as the filter function would recursively call itself to get
the next element ad infinitum), and since this function uses next internally,
it too would never terminate.
pub fn collect_until(
stream: Stream(a),
stop: fn(a) -> Bool,
) -> List(a)
Consume the provided Stream and collect elements into a List until the provided stop
function encounters an element for which it returns True, or the Stream ends.
Since this function has a built in limit, as long as all of the elements in the Stream
can be evaluated and terminate and there exists an element for which the provided stop
function returns True, it will also terminate, even if the Stream is infinite.
Basically, using this function protects you against the infinite length of the Stream,
but cannot protect you against potentially infinite requirements of the internal
function of the Stream.
Example
Such a Stream with infinite requirements of the internal function would be created
by something like this:
aqueduct.from_iterator(fn(num) { num + 2 }, 1)
|> aqueduct.filter(fn(num) { num % 2 == 0 })
This creates a Stream that emits odd numbers, while filtering all non-even numbers out.
Trying to call next on this Stream would never terminate
(as the filter function would recursively call itself to get the
next element ad infinitum), and since this function uses next internally, it too
would never terminate.
pub fn concat(first: Stream(a), second: Stream(a)) -> Stream(a)
Concatonate two Streams that emit the same type of element together.
Note
If the first Stream is infinite, then the second Stream will never appear.
pub fn drop(stream: Stream(a), count: Int) -> Stream(a)
Drop count elements from the beginning of the Stream.
If there are no elements left (the next function returned Done),
an empty Stream is returned.
pub fn each(stream: Stream(a), evaluate fun: fn(a) -> Nil) -> Nil
Consume the provided Stream and execute a function on each element, eagerly consuming them.
Use to execute side-effects based on the values in the Stream using functions that cannot
fail, when you don’t care about the consumed values on their own afterwards.
Note
If the provided Stream is infinite, this function will never return, and constantly
execute side effects on successive elements emitted by the Stream.
pub fn empty() -> Stream(a)
Create an empty Stream that always returns Done for the next step.
Since it never returns a Next variant of the Step type,
it does not have any type specialization, and can thus be used with any type of
specialized Stream.
pub fn filter(
stream: Stream(a),
predicate: fn(a) -> Bool,
) -> Stream(a)
Transform a Stream to only retain values that match the predicate.
Note
This function works by creating a new Stream, which for every element that is requested,
requests an element from the provided Stream.
If the returned element passes the function, then it stops there and returns itself;
If it does not, then it recursively calls itself until the element passes.
As such, if a Stream created using this function were to be based on an infinite Stream
and the values that pass were extremely rare, then calling next
on such a Stream would take a long time.
Furthermore, if no elements in the input infinite Stream were to pass the predicate,
then calling next on such a Stream will never return.
pub fn filter_map(
stream: Stream(a),
predicate: fn(a) -> Result(b, e),
) -> Stream(b)
Transform a Stream to only retain values that are returned in an Ok variant
of the Result type.
Basically equivalent to the composition of filter and
map, just a bit more optimized.
Note
This function works by creating a new Stream, which for every element that is
requested, requests an element from the provided Stream.
If the returned element passes the function, then it stops there and returns
that transformed element and calls itself on the returned Stream;
If it does not, then it recursively calls itself until the element passes.
As such, if a Stream created using this function were to be based on an infinite
Stream and the values that pass were extremely rare, then calling
next on such a Stream would take a long time.
Furthermore, if no elements in the input infinite Stream were to pass the predicate,
then calling next on such a Stream will never return.
pub fn foldl(stream: Stream(a), fun: fn(a, b) -> b, acc: b) -> b
Consume all of the elements of a Stream and fold them into a single value,
using the provided function and initial accumulator.
Note
Since this function tries to collect all of the elements of the input Stream,
if the Stream is infinite, then it will never terminate.
pub fn foldl_count(
stream: Stream(a),
count: Int,
fun: fn(a, b) -> b,
acc: b,
) -> #(Stream(a), b)
Consume count number of the elements of a Stream and fold them into a single value,
using the provided function and initial accumulator.
This function returns a Pair:
- The
Streamwithcountelements removed (if theStreamwas shorter thancount, it is anemptyStream) - The folded value.
Note
This function is safe to use with infinite Streams, since it has a built in termination
point - it takes only count elements, and returns the folded result of those elements.
pub fn from_divider(
source: a,
chunk: fn(a) -> Result(#(a, b), b),
) -> Stream(b)
Create a Stream from a given source, as well as a function that takes chunks out
of the source and returns it, diminished in some way (or not).
The signature of the chunk argument is fn(a) -> Result(#(a, b), b),
and the produced stream is Stream(b).
The Result type here is used to indicate wheter the source has run out of content or not.
- If the
chunkfunction returnsOk(#(a, b)), it means that there is still some stuff left in thesource, so keep going; - If the
chunkfunction returnsError(b), it means that the entirety of thesourcehas been consumed, and thebis the last element emitted.
Use cases
Due to the type signature of this function, the source and output types can be different,
making it possible to directly transform the source somehow.
In my case, I used this function to create a Stream that emits successive rows in a String
by splitting on \n, as long as they’re not inside of a cell wrapped in doublequotes.
Or, if you have a producer of a byte stream, the chunk function can at the same time request more, cut it off at appropriate points, and parse it.
In another example, this could be used to implement a custom
from_iterator function by returning an iterated
value with each chunk.
Note
It’s important to mention that the source provided to this function cannot be extracted
from within the resulting Stream, without consuing the Stream in its’ entirety and then
reversing the chunk function to collect it.
Lastly, also keep in mind that blocking operations called from within the chunk function
will block the process from which the Stream was consumed to get the next value;
However, the Stream has no idea whether the provided function is blocking or not.
As such, if you create a blocking Stream, it’s up to you as the user to remember that
it is blocking.
pub fn from_iterator(iter: fn(a) -> a, initial: a) -> Stream(a)
Create a Stream from a provided iterating function and an initial value, which will
forever return the next element as obtained by calling the iterating function on
the current element.
The initial value provided is the first element of the returned Stream.
Note
The resulting Stream will never return Done - it will go on forever.
Thus, if you try and traverse the entirety of this Stream (using functions such as
collect, foldl, or
each), that function call will never terminate.
pub fn from_list(from: List(a)) -> Stream(a)
Create a Stream from a List of elements, which it will output in order,
after which it will be finished.
Note
This splitting is done lazily - internally, the function encapsulated in the Stream
deconstructs the provided List.
If the List is empty, it returs Done for the next step;
If it has at least one element, it returns a Next Step, with the Stream being a
recursive call to this function with the tail of the List.
pub fn join(
stream: Stream(a),
fun: fn(a, a) -> a,
) -> Result(a, Nil)
Consume all of the elements of a Stream and join them into a single value,
using the provided function.
If the stream is empty, return Error(Nil), and if there’s only a single value,
return that. Only if there are two or more elements is the function called.
Use case
I made this function to imitate the output of the string.join function, but since unlike
Strings, this function works for an arbitrary element, I can’t just return an empty
string like string.join does.
So, under the hood, this function just gets the next step of the Stream once, and then
calls foldl, with the initial accumulator being the first element
of the Stream.
Note
Since this function tries to collect all of the elements of the input Stream,
if the Stream is infinite, then it will never terminate.
pub fn map(stream: Stream(a), fun: fn(a) -> b) -> Stream(b)
Transform the provided Stream using the given function.
This is done lazily - a new Stream is constructed, whose internal function just gets the
next Step from the old Stream, transforms the emitted element using
the provided function, and constructs a new Stream by calling itself on the one that was
returned inside of the Step type.
As such, it is safe to use with infinite Streams, since each step from the input Stream
that is called (importantly that terminated already) is guaranteed to return in finite
time… as long as the function provided takes finite time.
pub fn map2(
first: Stream(a),
second: Stream(b),
fun: fn(a, b) -> c,
) -> Stream(c)
Combines two Streams into another using the given function.
This is done lazily - a new Stream is constructed, whose internal function just gets the
next Step from the old Streams, and if both return a Next variant,
return a Next step by calling itself recursively on the new streams and call the function
on the two values.
Thus, if either of the Streams end, the resulting Stream ends - in short, the length
of the resulting Stream is the minimum of the length of the two input Streams.
pub fn maybe_append(
stream: Stream(a),
maybe_element: option.Option(a),
) -> Stream(a)
If the provided element is Some, it to the end of the Stream - it will be the
last element shown.
Otherwise, return the Stream unchanged.
Note
If the stream is infinite, this element will never be returned, since this function only
waits until the stream returns Done, and then replaces that Done with
Next(empty(), element).
So if the Stream never returns done, that element will never be returned.
pub fn maybe_prepend(
stream: Stream(a),
maybe_element: option.Option(a),
) -> Stream(a)
If the provided element is Some, add it to the start of the Stream - it will be
the next element shown.
Otherwise, return the Stream unchanged.
pub fn maybe_wrap(
stream: Stream(a),
in maybe_element: option.Option(a),
) -> Stream(a)
Helper function to both prepend and append the provided Option(element) to the Stream.
If the provided element is None, the stream is returned unmodified.
If it’s Some, it just calls wrap.
It is basically equivalent to first calling the maybe_append
function, then the maybe_prepend function.
pub fn next(stream: Stream(a)) -> Step(a)
Get the next value from the Stream.
Note
If the Stream was constructed from a function, that function is called to produce that
value, so if that function never terminates, then this function will also never terminate.
pub fn prepend(stream: Stream(a), element: a) -> Stream(a)
Add the provided element to the start of the Stream - it will be the next element shown.
Useful when the Stream you made is not ‘pure’ - ie, it executes some side effect in order
to obtain the next element. In such a case, it’s not guaranteed that after calling
next on such a Stream, and then returning to using the original
value would behave the same as expected.
Thus, if you wish to write generic functions that operate on Streams and wish to reverse a
Step, instead of reusing the old value, you should prefer to take the new Stream and the
value from the Next variant of a Step and use this function to prepend that value
to the new stream.
pub fn repeat(element: a) -> Stream(a)
Create an infinite Stream that only ever emits this single value and never emits Done.
pub fn repeat_list(from: List(a)) -> Result(Stream(a), Nil)
Create an infinite Stream that only ever emits the input List in a loop.
In most cases it’s quivalent to repeat_stream, but with
the added constraint of the fact that a List cannot be infinite in length, so it’s
guaranteed that the provided List will cycle.
Note
This function returns a Result because it doesn’t make sense to create an infinite Stream
by repeating elements from an empty List.
So this function returns Error(Nil) if the provided List is length 0.
pub fn repeat_stream(stream: Stream(a)) -> Result(Stream(a), Nil)
Transform a finite Stream into an infinite one by making it loop forever.
It can also be used with infinite Streams, but since this function works by waiting until
the stream is finished then replacing the Done value with another instance of the input
Stream, using it on infinite Streams is pointless, since they never return Done.
Note
This function returns a Result because it’s impossible to convert an empty Stream into
an infinite one, since there are no elements to infinitely repeat.
If this check were not in place, passing in an empty Stream would create a Stream
that would never emit the next element, as trying to call the internal function would
start an infinite recursive loop.
pub fn single(el: a) -> Stream(a)
Create a Stream with a singular element that it returns once, then finishes.
pub fn take_count(
stream: Stream(a),
count: Int,
) -> #(Stream(a), List(a))
Collect count number of elements from the Stream into a List, and return both the
List and the Stream without the collected elements.
Use this function over collect_count if you don’t
want to discard the Stream after taking the specified number of elements.
Note
This is done by recursively traversing the Stream output while decrementing count
until it reaches 0.
As such, if trying to call next doesn’t terminate, neither will this function.
pub fn take_until(
stream: Stream(a),
stop: fn(a) -> Bool,
) -> #(Stream(a), List(a))
Collect the values inside of the Stream into the List until an element evaluates
True when passed into the stop argument.
When an element evaluates True, the function ends, and returns the List containing all
of the previous elements without that one, and the Stream which does
contain that element.
Note
This is done by recursively traversing the Stream output until we encounter an element
that evaluates to True when passed to the stop function, and then prepending
that element to the Stream.
As such, if your Stream is created from a function that executes some side-effect to
obtain the next value, if you use this function, the very next iteration of this Stream
will not execute that operation, since it has the output of that function stored inside of it.
The simplest example is if you had a Stream that returned the system time whenever you
called it. Then, if you for some reason used this function, the very next element you’d
get from this Stream would be the time in the past, when the function was evaluated
inside of this function.