aws/internal/codec/event_stream

application/vnd.amazon.eventstream framing codec.

AWS event-stream operations (Transcribe, Kinesis SubscribeToShard, Bedrock streaming responses, S3 SelectObjectContent, etc.) deliver their bodies as a sequence of self-describing frames rather than a single payload. Each frame carries a small header set and an opaque payload; the protocol handler unpacks one frame at a time off the streaming transport.

On-wire layout (big-endian throughout):

+-----------------------+
| Total length      [4] |   <-- includes all four boxes below
| Headers length    [4] |   <-- bytes of Headers section
| Prelude CRC32     [4] |   <-- of the two ints above
| Headers       [N1]    |
| Payload       [N2]    |
| Message CRC32     [4] |   <-- of every byte before this one
+-----------------------+

Each Header is name_len[1] | name[name_len] | type[1] | value[...] where type selects the header-value shape. All ten header-value shapes the protocol defines (bool true/false, byte, short, int, long, binary, string, timestamp, uuid) are implemented — see HeaderValue for the wire-code mapping.

Types

Why decoding can fail. MalformedFrame covers any structural issue (truncated bytes, length fields disagreeing with each other); BadPreludeCrc / BadMessageCrc flag exactly which CRC check failed so callers can distinguish “stream got corrupted” from “we mis-parsed the framing”.

pub type DecodeError {
  MalformedFrame(reason: String)
  BadPreludeCrc
  BadMessageCrc
  UnknownHeaderType(type_code: Int)
}

Constructors

  • MalformedFrame(reason: String)
  • BadPreludeCrc
  • BadMessageCrc
  • UnknownHeaderType(type_code: Int)

One framed message: zero or more typed headers plus an opaque payload. The payload is uninterpreted at this level — protocol implementations (event-stream JSON, CBOR, etc.) layer on top.

pub type Event {
  Event(headers: List(Header), payload: BitArray)
}

Constructors

  • Event(headers: List(Header), payload: BitArray)
pub type Header {
  Header(name: String, value: HeaderValue)
}

Constructors

Header-value shapes. The on-wire type discriminator is owned by the encoder; callers construct these by variant name.

Coverage (wire-code in parens):

  • BoolTrueValue (0), BoolFalseValue (1) — no payload
  • ByteValue (2) — signed 8-bit
  • Int16Value (3) — signed 16-bit
  • Int32Value (4) — signed 32-bit
  • Int64Value (5) — signed 64-bit
  • BinaryValue (6) — 2-byte length prefix + bytes
  • StringValue (7) — 2-byte length prefix + UTF-8
  • TimestampValue (8) — millis since epoch (signed 64-bit)
  • UuidValue (9) — exactly 16 bytes
pub type HeaderValue {
  BoolTrueValue
  BoolFalseValue
  ByteValue(Int)
  Int16Value(Int)
  Int32Value(Int)
  Int64Value(Int)
  BinaryValue(BitArray)
  StringValue(String)
  TimestampValue(Int)
  UuidValue(BitArray)
}

Constructors

  • BoolTrueValue
  • BoolFalseValue
  • ByteValue(Int)
  • Int16Value(Int)
  • Int32Value(Int)
  • Int64Value(Int)
  • BinaryValue(BitArray)
  • StringValue(String)
  • TimestampValue(Int)
  • UuidValue(BitArray)

One pull-based step of an event-stream iterator. Yield carries the next decoded event plus the iterator’s remaining state for the subsequent call; Done marks normal end-of-stream; Failed surfaces the decode error encountered partway through (the stream is dead at that point — no further events recoverable).

pub type IterStep {
  Yield(event: Event, next: fn() -> IterStep)
  Done
  Failed(error: DecodeError)
}

Constructors

Values

pub fn decode(
  bytes: BitArray,
) -> Result(#(Event, BitArray), DecodeError)

Decode one framed message off the front of bytes. Returns the decoded Event plus the trailing bytes (which may hold the next frame; callers call decode again on the rest).

Validates both CRCs end-to-end — partial / corrupted streams surface as BadPreludeCrc / BadMessageCrc rather than silently returning garbage.

pub fn decode_all(
  body: streaming.StreamingBody,
) -> Result(List(Event), DecodeError)

Decode every frame from a streaming body. Materialises the full list of events — appropriate when the response is short (control messages, handshakes) or the call site wants to handle every event after the stream terminates. Long-lived subscription streams (SubscribeToShard, StartStreamTranscription) want fold_events instead so each event surfaces incrementally.

The streaming body’s chunks are concatenated first; the framing protocol’s length fields make incremental parsing safe across chunk boundaries, but materialising-then-parsing is simpler and equally correct for buffer-bounded responses.

pub fn encode(event: Event) -> BitArray

Frame an Event for transmission. Computes both CRC32s (prelude over the first two ints, message over every preceding byte) and returns the assembled BitArray ready to hand to the streaming transport.

pub fn events_to_streaming_body(
  events: List(Event),
) -> streaming.StreamingBody

Frame a list of events as a single StreamingBody. Each event is encoded in turn; the result is the concatenated frames in list order. Use this on the request side of @eventStream operations to hand the framed bytes to the streaming transport.

The body is a Chunked StreamingBody carrying one chunk per event, so the streaming transport can write them on the wire one frame at a time (fold_chunks preserves order). Buffered- then-streamed callers see the same wire bytes — to_bit_array concatenates in order.

pub fn fold_events(
  body: streaming.StreamingBody,
  initial: acc,
  f: fn(acc, Event) -> acc,
) -> Result(acc, DecodeError)

Reduce a streaming body’s event frames left-to-right by accumulating one decoded event at a time. The natural consumer API for long-lived subscription streams — the folder can update running state (counts, partial outputs, signals) without holding the whole event list in memory.

Returns Error(DecodeError) the moment a frame fails CRC or length checks, preserving the accumulator up to (but not including) the bad frame. Callers that want to keep going past a bad frame must do their own resync.

Reads the full body up front via streaming.to_bit_array so the fold runs on a single contiguous buffer; a future chunk-by-chunk consumer that decodes events as bytes arrive can keep this same surface — only the implementation changes.

pub fn iter_events(body: streaming.StreamingBody) -> IterStep

Wrap a streaming body as a pull-based event iterator. Each call to next returns either Yield(event, next) — the next decoded event plus a continuation for the rest of the stream — or Done at clean end-of-stream, or Failed(err) if the wire bytes don’t parse.

Useful for callers that want to drive consumption explicitly rather than handing the whole stream to fold_events. The codegen-emitted <op>_event_stream(client, input) wrappers return a streaming.Response; pipe resp.body through this helper to get a typed iterator without buffering the full event list in memory.

Today materialises the body up front (same as fold_eventsstreaming.to_bit_array); a follow-up that streams chunk-by- chunk lands when the wire transport surfaces partial frames.

pub fn string_header(
  event: Event,
  name: String,
) -> Result(String, Nil)

Look up the first StringValue header on event matching name. Used by codegen-emitted parse_<op>_event functions to dispatch on :event-type / :message-type (both are string-valued per the event-stream spec).

Search Document