datastream/text

Chunk-boundary-aware text stream operations on Stream(String).

Real text inputs arrive in arbitrary chunks (file reads, network reads). A naïve string.split per chunk is wrong because line terminators and delimiters land on chunk boundaries. The functions here absorb that with a small, bounded amount of internal buffering — they never materialise the full input.

Both functions consume Stream(String) rather than Stream(BitArray). The byte → string boundary is owned by text.utf8_decode (separate issue) so the cost lives in one place.

Values

pub fn lines(
  over stream: datastream.Stream(String),
) -> datastream.Stream(String)

Split a stream of strings into a stream of lines, treating \n, \r\n, and lone \r as terminators (matching Python’s str.splitlines()).

The terminator is NOT included in emitted lines. A trailing partial line (input that does not end with a terminator) is emitted as a final element so callers do not silently drop the last record.

Chunk-boundary handling: when a \r lands at the end of one chunk and the next chunk starts with \n, the pair is treated as a single \r\n boundary. A lone \r with no following \n (either because the next chunk does not start with \n, or because the stream ended) is treated as a line terminator.

pub fn records(
  over stream: datastream.Stream(String),
) -> datastream.Stream(List(String))

Group a stream of lines into records separated by blank lines.

Each emitted List(String) carries the lines of one record in source order. Blank lines act as record separators and are not included in any record. Multiple consecutive blank lines collapse (no empty records are emitted, no spurious record at the start of the stream when the input begins with a blank line). A trailing record without a terminating blank line is still emitted so the last record is not silently dropped.

Pair with text.lines for SSE (Server-Sent Events), blank-line-separated NDJSON variants, mbox / RFC 822-style envelopes, and similar wire formats:

stream
|> text.lines
|> text.records
|> stream.map(parse_event)

Chunk-boundary handling: this operator runs entirely on the line stream, so it inherits the chunk-boundary correctness of lines — a record terminator that lands across two chunks is still recognised because lines already absorbed the boundary before emitting.

pub fn split(
  over stream: datastream.Stream(String),
  on delimiter: String,
) -> datastream.Stream(String)

Split a stream of strings on delimiter, emitting every separated piece including the empty pieces between consecutive delimiters and at the start / end of the input.

delimiter == "" triggers the grapheme-cluster splitter: each emitted element is exactly one Unicode grapheme cluster of the input, in source order.

Chunk-boundary handling: pieces that span a chunk boundary are joined; the trailing in-flight piece is held in a small buffer and emitted only when a delimiter or end-of-stream is reached.

pub fn utf8_decode(
  over stream: datastream.Stream(BitArray),
) -> datastream.Stream(Result(String, Nil))

Decode a stream of UTF-8 byte chunks into a stream of decoded strings, reassembling multi-byte codepoints split across chunks.

Each successfully decoded codepoint surfaces as Ok(string). An invalid byte (lead byte that is not a valid UTF-8 start, or a missing/invalid continuation) emits exactly one Error(Nil) and decoding resumes from the next byte. An incomplete trailing sequence at end-of-stream emits a final Error(Nil) before halting.

The decoder holds a small internal buffer for partial multi-byte codepoints; it never materialises the full input.

pub fn utf8_decode_lossy(
  over stream: datastream.Stream(BitArray),
) -> datastream.Stream(String)

Decode a stream of UTF-8 byte chunks into a stream of decoded strings, silently dropping any chunk that fails to decode.

Convenience wrapper for the common “I just want strings” pipeline: chunks |> text.utf8_decode_lossy |> text.lines |> ... typechecks without a hand-rolled Result(String, Nil) -> Option(String) shim.

Use this when invalid UTF-8 should be tolerated as data corruption the caller does not need to observe (most file/socket reads of known-good text). Use utf8_decode directly when the caller needs to surface decode errors — for example via fold.partition_result to log them while still streaming the successful records through.

pub fn utf8_encode(
  over stream: datastream.Stream(String),
) -> datastream.Stream(BitArray)

Encode a stream of strings as UTF-8 byte chunks.

Each input string maps to its UTF-8 encoding as a single BitArray element; an empty input string maps to the empty BitArray.

Search Document