aqueduct

Main module of the aqueduct library. Contains all of the type definitions and public functions for operating on them.

The primary use cases of this library are creating Streams that emit successive values by executing some side effect (such as reading a file, consuming a message, awaiting user input, etc), and wrapping them in successive operations through map, filter, filter_map, which are only executed whenever the internal step function of a Stream is called.

It is extremely similar to the gleam_yielder library, but the one advantage this library offers over the yielder, is that the fundamental data types provided (Stream and Step) are not opaque, and thus the user can freely extend the provided functionality by simply implementing their own functions that operate on Streams.

Introduction

To start, you first need to create a Stream:

aqueduct.single("element")

And then do something with it:

assert aqueduct.next(stream) == Next(Stream(String), "element")

You can also transform the values:

stream
|> aqueduct.map(int.parse)
|> aqueduct.collect()
// -> [Error(Nil)]

and filter them:

stream
|> aqueduct.filter_map(int.parse)
|> aqueduct.collect()
// -> []

Note: Take care when filtering infinite Streams, as the Stream produced from filtering traverses the original Stream element by element, trying to find the next one that passes the specified predicate. If none of the elements in an infinite Stream pass, trying to get the next element will never terminate.

Cutting apart a given input

For the feature I’m proudest of in this package, take a look at from_divider.

It takes in a source and a function that cuts that source into smaller pieces that are then emitted as elements by the resulting Stream.

In mesv, I used this function to create a Stream(String) of rows in a file by splitting a String on newlines that were not somewhere between two ‘escapers’.

Specifically, the code of the function was thus:

aqueduct.from_divider(
  source,
  util.take_until_unescaped(row_separator, not_in: escaper),
)

And the code of the function used to split the source was:

// util.gleam
pub fn take_until_unescaped(
  separator el: String,
  not_in escaper: String,
) -> fn(String) -> Result(#(String, String), String) {
  fn(source: String) {
    take_until_unescaped_loop(source, el, escaper, None)
    |> result.map(pair.swap)
    |> result.map_error(fn(_) { source })
  }
}

fn take_until_unescaped_loop(
  from: String,
  separator: String,
  esc: String,
  acc: Option(String),
) -> Result(#(String, String), Nil) {
  case string.split_once(from, on: separator) {
    Ok(#(head, rest)) -> {
      let value = case acc {
        Some(str) -> str <> separator <> head
        None -> head
      }
      case count_non_overlapping(in: value, of: esc) % 2 == 0 {
        True -> Ok(#(value, rest))
        False ->
          take_until_unescaped_loop(
            rest,
            separator,
            esc,
            Some(value),
          )
      }
    }
    Error(Nil) -> Error(Nil)
  }
}

For specifics on this function’s behaviour, look at its’ documentation.

Types

Result of calling next on a Stream.

It either returns an element and the updated Stream, or Done, indicating the Stream is finished.

It’s generally emitted by the next function or by simply calling the internal step function after deconstructing a Stream, but there’s nothing stopping you from creating it yourself.

pub type Step(a) {
  Next(Stream(a), a)
  Done
}

Constructors

  • Next(Stream(a), a)

    This Stream emitted the below element, and returned the successive Stream.

  • Done

    This Stream has terminated.

Represents a lazily evaluated Stream of some type of data.

It does not care specifically how the next element is obtained - it merely encapsulates a function that when called, produces a Step. What this function does internally depends on how the Stream was defined.

Furthermore, a Stream can be finite or infinite, and currently there’s no good way to check whether a given Stream terminates.

To create it, either use one of the provided constructors:

Or simply build it by yourself.

pub type Stream(a) {
  Stream(fn() -> Step(a))
}

Constructors

  • Stream(fn() -> Step(a))

    Create a Stream from scratch by passing in a function that returns a Step.

    Most useful to combine with recursive functions that call themselves to construct the next Stream.

Values

pub fn append(stream: Stream(a), element: a) -> Stream(a)

Add the provided element to the end of the Stream - it will be the last element shown.

Note

If the stream is infinite, this element will never be returned, since this function only waits until the stream returns Done, and then replaces that Done with Next(empty(), element).

So if the Stream never returns Done, that element will never be returned.

pub fn collect(stream: Stream(a)) -> List(a)

Consume the provided Stream and collect all of the values into a List.

Note

As this function attempts to eagerly evaluate all of the elements until it encounters the Done next step, if called on an infinite Stream, it will never terminate.

pub fn collect_count(stream: Stream(a), count: Int) -> List(a)

Consume the provided Stream and collect count number of values into a List.

Since this function has a built in limit, as long as all of the elements in the Stream can be evaluated and terminate, it will also terminate, even if the Stream is infinite.

Basically, using this function protects you against the infinite length of the Stream, but cannot protect you against potentially infinite requirements of the internal function of the Stream.

Such a Stream with infinite requirements of the internal function would be created by something like this:

aqueduct.from_iterator(fn(num) { num + 2 }, 1)
|> aqueduct.filter(fn(num) { num % 2 == 0 })

This creates a Stream that emits odd numbers, while filtering all non-even numbers out. Trying to call next on this Stream would never terminate (as the filter function would recursively call itself to get the next element ad infinitum), and since this function uses next internally, it too would never terminate.

pub fn collect_until(
  stream: Stream(a),
  stop: fn(a) -> Bool,
) -> List(a)

Consume the provided Stream and collect elements into a List until the provided stop function encounters an element for which it returns True, or the Stream ends.

Since this function has a built in limit, as long as all of the elements in the Stream can be evaluated and terminate and there exists an element for which the provided stop function returns True, it will also terminate, even if the Stream is infinite.

Basically, using this function protects you against the infinite length of the Stream, but cannot protect you against potentially infinite requirements of the internal function of the Stream.

Example

Such a Stream with infinite requirements of the internal function would be created by something like this:

aqueduct.from_iterator(fn(num) { num + 2 }, 1)
|> aqueduct.filter(fn(num) { num % 2 == 0 })

This creates a Stream that emits odd numbers, while filtering all non-even numbers out. Trying to call next on this Stream would never terminate (as the filter function would recursively call itself to get the next element ad infinitum), and since this function uses next internally, it too would never terminate.

pub fn concat(first: Stream(a), second: Stream(a)) -> Stream(a)

Concatonate two Streams that emit the same type of element together.

Note

If the first Stream is infinite, then the second Stream will never appear.

pub fn drop(stream: Stream(a), count: Int) -> Stream(a)

Drop count elements from the beginning of the Stream.

If there are no elements left (the next function returned Done), an empty Stream is returned.

pub fn each(stream: Stream(a), evaluate fun: fn(a) -> Nil) -> Nil

Consume the provided Stream and execute a function on each element, eagerly consuming them.

Use to execute side-effects based on the values in the Stream using functions that cannot fail, when you don’t care about the consumed values on their own afterwards.

Note

If the provided Stream is infinite, this function will never return, and constantly execute side effects on successive elements emitted by the Stream.

pub fn empty() -> Stream(a)

Create an empty Stream that always returns Done for the next step.

Since it never returns a Next variant of the Step type, it does not have any type specialization, and can thus be used with any type of specialized Stream.

pub fn filter(
  stream: Stream(a),
  predicate: fn(a) -> Bool,
) -> Stream(a)

Transform a Stream to only retain values that match the predicate.

Note

This function works by creating a new Stream, which for every element that is requested, requests an element from the provided Stream.

If the returned element passes the function, then it stops there and returns itself;

If it does not, then it recursively calls itself until the element passes.

As such, if a Stream created using this function were to be based on an infinite Stream and the values that pass were extremely rare, then calling next on such a Stream would take a long time.

Furthermore, if no elements in the input infinite Stream were to pass the predicate, then calling next on such a Stream will never return.

pub fn filter_map(
  stream: Stream(a),
  predicate: fn(a) -> Result(b, e),
) -> Stream(b)

Transform a Stream to only retain values that are returned in an Ok variant of the Result type.

Basically equivalent to the composition of filter and map, just a bit more optimized.

Note

This function works by creating a new Stream, which for every element that is requested, requests an element from the provided Stream.

If the returned element passes the function, then it stops there and returns that transformed element and calls itself on the returned Stream;

If it does not, then it recursively calls itself until the element passes.

As such, if a Stream created using this function were to be based on an infinite Stream and the values that pass were extremely rare, then calling next on such a Stream would take a long time.

Furthermore, if no elements in the input infinite Stream were to pass the predicate, then calling next on such a Stream will never return.

pub fn foldl(stream: Stream(a), fun: fn(a, b) -> b, acc: b) -> b

Consume all of the elements of a Stream and fold them into a single value, using the provided function and initial accumulator.

Note

Since this function tries to collect all of the elements of the input Stream, if the Stream is infinite, then it will never terminate.

pub fn foldl_count(
  stream: Stream(a),
  count: Int,
  fun: fn(a, b) -> b,
  acc: b,
) -> #(Stream(a), b)

Consume count number of the elements of a Stream and fold them into a single value, using the provided function and initial accumulator.

This function returns a Pair:

  • The Stream with count elements removed (if the Stream was shorter than count, it is an empty Stream)
  • The folded value.

Note

This function is safe to use with infinite Streams, since it has a built in termination point - it takes only count elements, and returns the folded result of those elements.

pub fn from_divider(
  source: a,
  chunk: fn(a) -> Result(#(a, b), b),
) -> Stream(b)

Create a Stream from a given source, as well as a function that takes chunks out of the source and returns it, diminished in some way (or not).

The signature of the chunk argument is fn(a) -> Result(#(a, b), b), and the produced stream is Stream(b).

The Result type here is used to indicate wheter the source has run out of content or not.

  • If the chunk function returns Ok(#(a, b)), it means that there is still some stuff left in the source, so keep going;
  • If the chunk function returns Error(b), it means that the entirety of the source has been consumed, and the b is the last element emitted.

Use cases

Due to the type signature of this function, the source and output types can be different, making it possible to directly transform the source somehow.

In my case, I used this function to create a Stream that emits successive rows in a String by splitting on \n, as long as they’re not inside of a cell wrapped in doublequotes.

Or, if you have a producer of a byte stream, the chunk function can at the same time request more, cut it off at appropriate points, and parse it.

In another example, this could be used to implement a custom from_iterator function by returning an iterated value with each chunk.

Note

It’s important to mention that the source provided to this function cannot be extracted from within the resulting Stream, without consuing the Stream in its’ entirety and then reversing the chunk function to collect it.

Lastly, also keep in mind that blocking operations called from within the chunk function will block the process from which the Stream was consumed to get the next value; However, the Stream has no idea whether the provided function is blocking or not.

As such, if you create a blocking Stream, it’s up to you as the user to remember that it is blocking.

pub fn from_iterator(iter: fn(a) -> a, initial: a) -> Stream(a)

Create a Stream from a provided iterating function and an initial value, which will forever return the next element as obtained by calling the iterating function on the current element.

The initial value provided is the first element of the returned Stream.

Note

The resulting Stream will never return Done - it will go on forever.

Thus, if you try and traverse the entirety of this Stream (using functions such as collect, foldl, or each), that function call will never terminate.

pub fn from_list(from: List(a)) -> Stream(a)

Create a Stream from a List of elements, which it will output in order, after which it will be finished.

Note

This splitting is done lazily - internally, the function encapsulated in the Stream deconstructs the provided List.

If the List is empty, it returs Done for the next step;

If it has at least one element, it returns a Next Step, with the Stream being a recursive call to this function with the tail of the List.

pub fn join(
  stream: Stream(a),
  fun: fn(a, a) -> a,
) -> Result(a, Nil)

Consume all of the elements of a Stream and join them into a single value, using the provided function.

If the stream is empty, return Error(Nil), and if there’s only a single value, return that. Only if there are two or more elements is the function called.

Use case

I made this function to imitate the output of the string.join function, but since unlike Strings, this function works for an arbitrary element, I can’t just return an empty string like string.join does.

So, under the hood, this function just gets the next step of the Stream once, and then calls foldl, with the initial accumulator being the first element of the Stream.

Note

Since this function tries to collect all of the elements of the input Stream, if the Stream is infinite, then it will never terminate.

pub fn map(stream: Stream(a), fun: fn(a) -> b) -> Stream(b)

Transform the provided Stream using the given function.

This is done lazily - a new Stream is constructed, whose internal function just gets the next Step from the old Stream, transforms the emitted element using the provided function, and constructs a new Stream by calling itself on the one that was returned inside of the Step type.

As such, it is safe to use with infinite Streams, since each step from the input Stream that is called (importantly that terminated already) is guaranteed to return in finite time… as long as the function provided takes finite time.

pub fn map2(
  first: Stream(a),
  second: Stream(b),
  fun: fn(a, b) -> c,
) -> Stream(c)

Combines two Streams into another using the given function.

This is done lazily - a new Stream is constructed, whose internal function just gets the next Step from the old Streams, and if both return a Next variant, return a Next step by calling itself recursively on the new streams and call the function on the two values.

Thus, if either of the Streams end, the resulting Stream ends - in short, the length of the resulting Stream is the minimum of the length of the two input Streams.

pub fn maybe_append(
  stream: Stream(a),
  maybe_element: option.Option(a),
) -> Stream(a)

If the provided element is Some, it to the end of the Stream - it will be the last element shown.

Otherwise, return the Stream unchanged.

Note

If the stream is infinite, this element will never be returned, since this function only waits until the stream returns Done, and then replaces that Done with Next(empty(), element).

So if the Stream never returns done, that element will never be returned.

pub fn maybe_prepend(
  stream: Stream(a),
  maybe_element: option.Option(a),
) -> Stream(a)

If the provided element is Some, add it to the start of the Stream - it will be the next element shown.

Otherwise, return the Stream unchanged.

pub fn maybe_wrap(
  stream: Stream(a),
  in maybe_element: option.Option(a),
) -> Stream(a)

Helper function to both prepend and append the provided Option(element) to the Stream.

If the provided element is None, the stream is returned unmodified.

If it’s Some, it just calls wrap.

It is basically equivalent to first calling the maybe_append function, then the maybe_prepend function.

pub fn next(stream: Stream(a)) -> Step(a)

Get the next value from the Stream.

Note

If the Stream was constructed from a function, that function is called to produce that value, so if that function never terminates, then this function will also never terminate.

pub fn prepend(stream: Stream(a), element: a) -> Stream(a)

Add the provided element to the start of the Stream - it will be the next element shown.

Useful when the Stream you made is not ‘pure’ - ie, it executes some side effect in order to obtain the next element. In such a case, it’s not guaranteed that after calling next on such a Stream, and then returning to using the original value would behave the same as expected.

Thus, if you wish to write generic functions that operate on Streams and wish to reverse a Step, instead of reusing the old value, you should prefer to take the new Stream and the value from the Next variant of a Step and use this function to prepend that value to the new stream.

pub fn repeat(element: a) -> Stream(a)

Create an infinite Stream that only ever emits this single value and never emits Done.

pub fn repeat_list(from: List(a)) -> Result(Stream(a), Nil)

Create an infinite Stream that only ever emits the input List in a loop.

In most cases it’s quivalent to repeat_stream, but with the added constraint of the fact that a List cannot be infinite in length, so it’s guaranteed that the provided List will cycle.

Note

This function returns a Result because it doesn’t make sense to create an infinite Stream by repeating elements from an empty List.

So this function returns Error(Nil) if the provided List is length 0.

pub fn repeat_stream(stream: Stream(a)) -> Result(Stream(a), Nil)

Transform a finite Stream into an infinite one by making it loop forever.

It can also be used with infinite Streams, but since this function works by waiting until the stream is finished then replacing the Done value with another instance of the input Stream, using it on infinite Streams is pointless, since they never return Done.

Note

This function returns a Result because it’s impossible to convert an empty Stream into an infinite one, since there are no elements to infinitely repeat.

If this check were not in place, passing in an empty Stream would create a Stream that would never emit the next element, as trying to call the internal function would start an infinite recursive loop.

pub fn single(el: a) -> Stream(a)

Create a Stream with a singular element that it returns once, then finishes.

pub fn take_count(
  stream: Stream(a),
  count: Int,
) -> #(Stream(a), List(a))

Collect count number of elements from the Stream into a List, and return both the List and the Stream without the collected elements.

Use this function over collect_count if you don’t want to discard the Stream after taking the specified number of elements.

Note

This is done by recursively traversing the Stream output while decrementing count until it reaches 0.

As such, if trying to call next doesn’t terminate, neither will this function.

pub fn take_until(
  stream: Stream(a),
  stop: fn(a) -> Bool,
) -> #(Stream(a), List(a))

Collect the values inside of the Stream into the List until an element evaluates True when passed into the stop argument.

When an element evaluates True, the function ends, and returns the List containing all of the previous elements without that one, and the Stream which does contain that element.

Note

This is done by recursively traversing the Stream output until we encounter an element that evaluates to True when passed to the stop function, and then prepending that element to the Stream.

As such, if your Stream is created from a function that executes some side-effect to obtain the next value, if you use this function, the very next iteration of this Stream will not execute that operation, since it has the output of that function stored inside of it.

The simplest example is if you had a Stream that returned the system time whenever you called it. Then, if you for some reason used this function, the very next element you’d get from this Stream would be the time in the past, when the function was evaluated inside of this function.

pub fn wrap(stream: Stream(a), in element: a) -> Stream(a)

Helper function to both prepend and append the provided element to the Stream.

It is basically equivalent to first calling the append function, then the prepend function.

Search Document