nquic_stream_statem (nquic v1.0.0)

View Source

Stream state machine per RFC 9000 Section 3.

Manages per-stream send/receive state, data buffering, and reassembly. Handles contiguous delivery, out-of-order buffering, FIN processing, and the send state transitions (ready -> send -> data_sent).

Summary

Functions

Process an incoming STREAM frame, buffering and reassembling data.

Buffer outgoing data on a stream. Appends Data to the stream's pending_send_data and latches Fin. The actual STREAM frame(s) are produced later by nquic_protocol_streams_send:drain_pending_sends/1 at flush time, where they can be split to fit the path MTU and the congestion window. Advancing send_offset here (rather than at drain time) keeps the existing flow-control checks honest: they already treat send_offset as "bytes committed to the stream", which is what we want. Returns {error, stream_closed} when the send-side is already terminal (data_sent, data_recvd, reset_sent, reset_recvd).

Create a new stream state for the given stream ID and type.

Functions

handle_recv/2

-spec handle_recv(#stream_state{stream_id :: nquic:stream_id(),
                                type :: bidi | uni,
                                send_state ::
                                    ready | send | data_sent | data_recvd | reset_sent | reset_recvd,
                                send_offset :: non_neg_integer(),
                                send_max_data :: non_neg_integer(),
                                last_stream_data_blocked :: non_neg_integer(),
                                pending_send_data :: [binary()],
                                pending_send_size :: non_neg_integer(),
                                pending_send_fin :: boolean(),
                                recv_state ::
                                    recv | size_known | data_recvd | reset_recvd | data_read |
                                    reset_read,
                                recv_offset :: non_neg_integer(),
                                recv_max_offset :: non_neg_integer(),
                                recv_window :: non_neg_integer(),
                                recv_buffer :: gb_trees:tree(non_neg_integer(), {binary(), boolean()}),
                                app_buffer :: iodata(),
                                app_buffer_size :: non_neg_integer()},
                  nquic_frame:t()) ->
                     {ok,
                      #stream_state{stream_id :: nquic:stream_id(),
                                    type :: bidi | uni,
                                    send_state ::
                                        ready | send | data_sent | data_recvd | reset_sent | reset_recvd,
                                    send_offset :: non_neg_integer(),
                                    send_max_data :: non_neg_integer(),
                                    last_stream_data_blocked :: non_neg_integer(),
                                    pending_send_data :: [binary()],
                                    pending_send_size :: non_neg_integer(),
                                    pending_send_fin :: boolean(),
                                    recv_state ::
                                        recv | size_known | data_recvd | reset_recvd | data_read |
                                        reset_read,
                                    recv_offset :: non_neg_integer(),
                                    recv_max_offset :: non_neg_integer(),
                                    recv_window :: non_neg_integer(),
                                    recv_buffer ::
                                        gb_trees:tree(non_neg_integer(), {binary(), boolean()}),
                                    app_buffer :: iodata(),
                                    app_buffer_size :: non_neg_integer()}} |
                     {error, nquic_error:any_reason()}.

Process an incoming STREAM frame, buffering and reassembling data.

handle_send(State, Data, Fin)

-spec handle_send(#stream_state{stream_id :: nquic:stream_id(),
                                type :: bidi | uni,
                                send_state ::
                                    ready | send | data_sent | data_recvd | reset_sent | reset_recvd,
                                send_offset :: non_neg_integer(),
                                send_max_data :: non_neg_integer(),
                                last_stream_data_blocked :: non_neg_integer(),
                                pending_send_data :: [binary()],
                                pending_send_size :: non_neg_integer(),
                                pending_send_fin :: boolean(),
                                recv_state ::
                                    recv | size_known | data_recvd | reset_recvd | data_read |
                                    reset_read,
                                recv_offset :: non_neg_integer(),
                                recv_max_offset :: non_neg_integer(),
                                recv_window :: non_neg_integer(),
                                recv_buffer :: gb_trees:tree(non_neg_integer(), {binary(), boolean()}),
                                app_buffer :: iodata(),
                                app_buffer_size :: non_neg_integer()},
                  iodata(),
                  boolean()) ->
                     {ok,
                      #stream_state{stream_id :: nquic:stream_id(),
                                    type :: bidi | uni,
                                    send_state ::
                                        ready | send | data_sent | data_recvd | reset_sent | reset_recvd,
                                    send_offset :: non_neg_integer(),
                                    send_max_data :: non_neg_integer(),
                                    last_stream_data_blocked :: non_neg_integer(),
                                    pending_send_data :: [binary()],
                                    pending_send_size :: non_neg_integer(),
                                    pending_send_fin :: boolean(),
                                    recv_state ::
                                        recv | size_known | data_recvd | reset_recvd | data_read |
                                        reset_read,
                                    recv_offset :: non_neg_integer(),
                                    recv_max_offset :: non_neg_integer(),
                                    recv_window :: non_neg_integer(),
                                    recv_buffer ::
                                        gb_trees:tree(non_neg_integer(), {binary(), boolean()}),
                                    app_buffer :: iodata(),
                                    app_buffer_size :: non_neg_integer()}} |
                     {error, term()}.

Buffer outgoing data on a stream. Appends Data to the stream's pending_send_data and latches Fin. The actual STREAM frame(s) are produced later by nquic_protocol_streams_send:drain_pending_sends/1 at flush time, where they can be split to fit the path MTU and the congestion window. Advancing send_offset here (rather than at drain time) keeps the existing flow-control checks honest: they already treat send_offset as "bytes committed to the stream", which is what we want. Returns {error, stream_closed} when the send-side is already terminal (data_sent, data_recvd, reset_sent, reset_recvd).

new(StreamID, Type)

-spec new(nquic:stream_id(), bidi | uni) ->
             #stream_state{stream_id :: nquic:stream_id(),
                           type :: bidi | uni,
                           send_state ::
                               ready | send | data_sent | data_recvd | reset_sent | reset_recvd,
                           send_offset :: non_neg_integer(),
                           send_max_data :: non_neg_integer(),
                           last_stream_data_blocked :: non_neg_integer(),
                           pending_send_data :: [binary()],
                           pending_send_size :: non_neg_integer(),
                           pending_send_fin :: boolean(),
                           recv_state ::
                               recv | size_known | data_recvd | reset_recvd | data_read | reset_read,
                           recv_offset :: non_neg_integer(),
                           recv_max_offset :: non_neg_integer(),
                           recv_window :: non_neg_integer(),
                           recv_buffer :: gb_trees:tree(non_neg_integer(), {binary(), boolean()}),
                           app_buffer :: iodata(),
                           app_buffer_size :: non_neg_integer()}.

Create a new stream state for the given stream ID and type.