datastream/text
Chunk-boundary-aware text stream operations on Stream(String).
Real text inputs arrive in arbitrary chunks (file reads, network
reads). A naïve string.split per chunk is wrong because line
terminators and delimiters land on chunk boundaries. The functions
here absorb that with a small, bounded amount of internal buffering
— they never materialise the full input.
Both functions consume Stream(String) rather than
Stream(BitArray). The byte → string boundary is owned by
text.utf8_decode (separate issue) so the cost lives in one place.
Values
pub fn lines(
over stream: datastream.Stream(String),
) -> datastream.Stream(String)
Split a stream of strings into a stream of lines, treating \n,
\r\n, and lone \r as terminators (matching Python’s
str.splitlines()).
The terminator is NOT included in emitted lines. A trailing partial line (input that does not end with a terminator) is emitted as a final element so callers do not silently drop the last record.
Chunk-boundary handling: when a \r lands at the end of one chunk
and the next chunk starts with \n, the pair is treated as a
single \r\n boundary. A lone \r with no following \n (either
because the next chunk does not start with \n, or because the
stream ended) is treated as a line terminator.
pub fn records(
over stream: datastream.Stream(String),
) -> datastream.Stream(List(String))
Group a stream of lines into records separated by blank lines.
Each emitted List(String) carries the lines of one record in
source order. Blank lines act as record separators and are not
included in any record. Multiple consecutive blank lines collapse
(no empty records are emitted, no spurious record at the start of
the stream when the input begins with a blank line). A trailing
record without a terminating blank line is still emitted so the
last record is not silently dropped.
Pair with text.lines for SSE (Server-Sent Events),
blank-line-separated NDJSON variants, mbox / RFC 822-style
envelopes, and similar wire formats:
stream
|> text.lines
|> text.records
|> stream.map(parse_event)
Chunk-boundary handling: this operator runs entirely on the line
stream, so it inherits the chunk-boundary correctness of lines
— a record terminator that lands across two chunks is still
recognised because lines already absorbed the boundary before
emitting.
pub fn split(
over stream: datastream.Stream(String),
on delimiter: String,
) -> datastream.Stream(String)
Split a stream of strings on delimiter, emitting every separated
piece including the empty pieces between consecutive delimiters and
at the start / end of the input.
delimiter == "" triggers the grapheme-cluster splitter: each
emitted element is exactly one Unicode grapheme cluster of the
input, in source order.
Chunk-boundary handling: pieces that span a chunk boundary are joined; the trailing in-flight piece is held in a small buffer and emitted only when a delimiter or end-of-stream is reached.
pub fn utf8_decode(
over stream: datastream.Stream(BitArray),
) -> datastream.Stream(Result(String, Nil))
Decode a stream of UTF-8 byte chunks into a stream of decoded strings, reassembling multi-byte codepoints split across chunks.
Each successfully decoded codepoint surfaces as Ok(string). An
invalid byte (lead byte that is not a valid UTF-8 start, or a
missing/invalid continuation) emits exactly one Error(Nil) and
decoding resumes from the next byte. An incomplete trailing
sequence at end-of-stream emits a final Error(Nil) before halting.
The decoder holds a small internal buffer for partial multi-byte codepoints; it never materialises the full input.
pub fn utf8_encode(
over stream: datastream.Stream(String),
) -> datastream.Stream(BitArray)
Encode a stream of strings as UTF-8 byte chunks.
Each input string maps to its UTF-8 encoding as a single
BitArray element; an empty input string maps to the empty
BitArray.