aws/internal/codec/event_stream
application/vnd.amazon.eventstream framing codec.
AWS event-stream operations (Transcribe, Kinesis SubscribeToShard, Bedrock streaming responses, S3 SelectObjectContent, etc.) deliver their bodies as a sequence of self-describing frames rather than a single payload. Each frame carries a small header set and an opaque payload; the protocol handler unpacks one frame at a time off the streaming transport.
On-wire layout (big-endian throughout):
+-----------------------+
| Total length [4] | <-- includes all four boxes below
| Headers length [4] | <-- bytes of Headers section
| Prelude CRC32 [4] | <-- of the two ints above
| Headers [N1] |
| Payload [N2] |
| Message CRC32 [4] | <-- of every byte before this one
+-----------------------+
Each Header is name_len[1] | name[name_len] | type[1] | value[...]
where type selects the header-value shape. All ten header-value
shapes the protocol defines (bool true/false, byte, short, int,
long, binary, string, timestamp, uuid) are implemented — see
HeaderValue for the wire-code mapping.
Types
Why decoding can fail. MalformedFrame covers any structural
issue (truncated bytes, length fields disagreeing with each
other); BadPreludeCrc / BadMessageCrc flag exactly which
CRC check failed so callers can distinguish “stream got
corrupted” from “we mis-parsed the framing”.
pub type DecodeError {
MalformedFrame(reason: String)
BadPreludeCrc
BadMessageCrc
UnknownHeaderType(type_code: Int)
}
Constructors
-
MalformedFrame(reason: String) -
BadPreludeCrc -
BadMessageCrc -
UnknownHeaderType(type_code: Int)
One framed message: zero or more typed headers plus an opaque payload. The payload is uninterpreted at this level — protocol implementations (event-stream JSON, CBOR, etc.) layer on top.
pub type Event {
Event(headers: List(Header), payload: BitArray)
}
Constructors
-
Event(headers: List(Header), payload: BitArray)
pub type Header {
Header(name: String, value: HeaderValue)
}
Constructors
-
Header(name: String, value: HeaderValue)
Header-value shapes. The on-wire type discriminator is owned by the encoder; callers construct these by variant name.
Coverage (wire-code in parens):
BoolTrueValue(0),BoolFalseValue(1) — no payloadByteValue(2) — signed 8-bitInt16Value(3) — signed 16-bitInt32Value(4) — signed 32-bitInt64Value(5) — signed 64-bitBinaryValue(6) — 2-byte length prefix + bytesStringValue(7) — 2-byte length prefix + UTF-8TimestampValue(8) — millis since epoch (signed 64-bit)UuidValue(9) — exactly 16 bytes
pub type HeaderValue {
BoolTrueValue
BoolFalseValue
ByteValue(Int)
Int16Value(Int)
Int32Value(Int)
Int64Value(Int)
BinaryValue(BitArray)
StringValue(String)
TimestampValue(Int)
UuidValue(BitArray)
}
Constructors
-
BoolTrueValue -
BoolFalseValue -
ByteValue(Int) -
Int16Value(Int) -
Int32Value(Int) -
Int64Value(Int) -
BinaryValue(BitArray) -
StringValue(String) -
TimestampValue(Int) -
UuidValue(BitArray)
One pull-based step of an event-stream iterator. Yield carries
the next decoded event plus the iterator’s remaining state for
the subsequent call; Done marks normal end-of-stream; Failed
surfaces the decode error encountered partway through (the
stream is dead at that point — no further events recoverable).
pub type IterStep {
Yield(event: Event, next: fn() -> IterStep)
Done
Failed(error: DecodeError)
}
Constructors
-
-
Done -
Failed(error: DecodeError)
Values
pub fn decode(
bytes: BitArray,
) -> Result(#(Event, BitArray), DecodeError)
Decode one framed message off the front of bytes. Returns the
decoded Event plus the trailing bytes (which may hold the next
frame; callers call decode again on the rest).
Validates both CRCs end-to-end — partial / corrupted streams
surface as BadPreludeCrc / BadMessageCrc rather than silently
returning garbage.
pub fn decode_all(
body: streaming.StreamingBody,
) -> Result(List(Event), DecodeError)
Decode every frame from a streaming body. Materialises the full
list of events — appropriate when the response is short (control
messages, handshakes) or the call site wants to handle every
event after the stream terminates. Long-lived subscription
streams (SubscribeToShard, StartStreamTranscription) want
fold_events instead so each event surfaces incrementally.
The streaming body’s chunks are concatenated first; the framing protocol’s length fields make incremental parsing safe across chunk boundaries, but materialising-then-parsing is simpler and equally correct for buffer-bounded responses.
pub fn encode(event: Event) -> BitArray
Frame an Event for transmission. Computes both CRC32s (prelude
over the first two ints, message over every preceding byte) and
returns the assembled BitArray ready to hand to the streaming
transport.
pub fn events_to_streaming_body(
events: List(Event),
) -> streaming.StreamingBody
Frame a list of events as a single StreamingBody. Each event
is encoded in turn; the result is the concatenated frames in
list order. Use this on the request side of @eventStream
operations to hand the framed bytes to the streaming transport.
The body is a Chunked StreamingBody carrying one chunk per
event, so the streaming transport can write them on the wire
one frame at a time (fold_chunks preserves order). Buffered-
then-streamed callers see the same wire bytes — to_bit_array
concatenates in order.
pub fn fold_events(
body: streaming.StreamingBody,
initial: acc,
f: fn(acc, Event) -> acc,
) -> Result(acc, DecodeError)
Reduce a streaming body’s event frames left-to-right by accumulating one decoded event at a time. The natural consumer API for long-lived subscription streams — the folder can update running state (counts, partial outputs, signals) without holding the whole event list in memory.
Returns Error(DecodeError) the moment a frame fails CRC or
length checks, preserving the accumulator up to (but not
including) the bad frame. Callers that want to keep going past
a bad frame must do their own resync.
Reads the full body up front via streaming.to_bit_array so the
fold runs on a single contiguous buffer; a future chunk-by-chunk
consumer that decodes events as bytes arrive can keep this same
surface — only the implementation changes.
pub fn iter_events(body: streaming.StreamingBody) -> IterStep
Wrap a streaming body as a pull-based event iterator. Each call
to next returns either Yield(event, next) — the next decoded
event plus a continuation for the rest of the stream — or Done
at clean end-of-stream, or Failed(err) if the wire bytes don’t
parse.
Useful for callers that want to drive consumption explicitly
rather than handing the whole stream to fold_events. The
codegen-emitted <op>_event_stream(client, input) wrappers
return a streaming.Response; pipe resp.body through this
helper to get a typed iterator without buffering the full event
list in memory.
Today materialises the body up front (same as fold_events —
streaming.to_bit_array); a follow-up that streams chunk-by-
chunk lands when the wire transport surfaces partial frames.
pub fn string_header(
event: Event,
name: String,
) -> Result(String, Nil)
Look up the first StringValue header on event matching name.
Used by codegen-emitted parse_<op>_event functions to dispatch
on :event-type / :message-type (both are string-valued per the
event-stream spec).