nquic_conn_send_waiters (nquic v1.0.0)

View Source

Sync-send backpressure for handshake-phase stream writes in nquic_conn_statem.

A send gates on the per-stream send_buffer_high_water mark. When a sync caller's bytes don't all fit, the unsent tail is parked here as a t(). Each call to wake/1 (run after every flush in established state) tries to push more of each waiter's bytes into the stream's buffer. Fully-served waiters get an ok reply; the configured send_timeout delivers {error, send_timeout} if the wait runs too long.

Summary

Functions

Sync send_stream / send_fin entry point, parking with the connection-level send_timeout.

Same as handle_sync_send/5 but with an explicit per-call Timeout for the parked waiter, overriding the connection-level send_timeout.

Pop the waiter whose From and timer ref both match. The ref check guards against a stale {timeout, _, _} message that was already in flight when we asynchronously cancelled the timer (e.g. the waiter was just woken by wake/1).

Walk the parked waiters in FIFO order and try to queue more of each waiter's bytes onto its stream. Returns {NewData, ReplyActions}; the caller (typically the post-flush path) prepends the reply actions to the gen_statem actions list.

Types

t()

-type t() :: {gen_statem:from(), nquic:stream_id(), binary(), boolean(), reference() | undefined}.

Functions

handle_sync_send(From, StreamID, DataBin, Fin, Data)

-spec handle_sync_send(gen_statem:from(),
                       nquic:stream_id(),
                       iodata(),
                       fin | nofin,
                       #conn_state{role :: client | server,
                                   scid :: nquic:connection_id(),
                                   dcid :: nquic:connection_id(),
                                   odcid :: nquic:connection_id() | undefined,
                                   retry_scid :: nquic:connection_id() | undefined,
                                   retry_token :: binary(),
                                   version :: non_neg_integer(),
                                   version_preference :: [non_neg_integer()],
                                   socket :: nquic_socket:t() | undefined,
                                   peer :: nquic_socket:sockaddr() | undefined,
                                   select_info :: nquic_socket:select_info() | undefined,
                                   pn_spaces :: #{nquic_packet:space() => map()},
                                   app_next_pn :: non_neg_integer(),
                                   app_largest_received :: integer(),
                                   loss_state :: nquic_loss:loss_state() | undefined,
                                   dispatch_table :: nquic_dispatch:t() | undefined,
                                   listener :: pid() | undefined,
                                   connect_waiters :: [gen_server:from()],
                                   local_params ::
                                       #transport_params{original_destination_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         max_idle_timeout :: non_neg_integer(),
                                                         stateless_reset_token :: binary() | undefined,
                                                         max_udp_payload_size :: pos_integer(),
                                                         initial_max_data :: non_neg_integer(),
                                                         initial_max_stream_data_bidi_local ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_bidi_remote ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_uni ::
                                                             non_neg_integer(),
                                                         initial_max_streams_bidi :: non_neg_integer(),
                                                         initial_max_streams_uni :: non_neg_integer(),
                                                         ack_delay_exponent :: 0..20,
                                                         max_ack_delay :: non_neg_integer(),
                                                         disable_active_migration :: boolean(),
                                                         preferred_address ::
                                                             nquic_transport:preferred_address() |
                                                             undefined,
                                                         active_connection_id_limit :: non_neg_integer(),
                                                         initial_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         retry_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         version_information ::
                                                             nquic_transport:version_information() |
                                                             undefined,
                                                         max_datagram_frame_size ::
                                                             non_neg_integer() | undefined},
                                   remote_params ::
                                       #transport_params{original_destination_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         max_idle_timeout :: non_neg_integer(),
                                                         stateless_reset_token :: binary() | undefined,
                                                         max_udp_payload_size :: pos_integer(),
                                                         initial_max_data :: non_neg_integer(),
                                                         initial_max_stream_data_bidi_local ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_bidi_remote ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_uni ::
                                                             non_neg_integer(),
                                                         initial_max_streams_bidi :: non_neg_integer(),
                                                         initial_max_streams_uni :: non_neg_integer(),
                                                         ack_delay_exponent :: 0..20,
                                                         max_ack_delay :: non_neg_integer(),
                                                         disable_active_migration :: boolean(),
                                                         preferred_address ::
                                                             nquic_transport:preferred_address() |
                                                             undefined,
                                                         active_connection_id_limit :: non_neg_integer(),
                                                         initial_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         retry_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         version_information ::
                                                             nquic_transport:version_information() |
                                                             undefined,
                                                         max_datagram_frame_size ::
                                                             non_neg_integer() | undefined} |
                                       undefined,
                                   server_packet_processed :: boolean(),
                                   owner :: pid() | undefined,
                                   owner_mon :: reference() | undefined,
                                   deferred_flush_pending :: boolean(),
                                   pending_ack_count :: non_neg_integer(),
                                   last_idle_ms :: non_neg_integer() | infinity | undefined,
                                   last_pto_ms :: non_neg_integer() | cancel | undefined,
                                   recv_ecn :: nquic_socket:ecn_mark(),
                                   pmtud :: nquic_pmtud:pmtud_state() | undefined,
                                   gso_size :: undefined | pos_integer(),
                                   max_payload_size :: pos_integer(),
                                   server_per_conn_fd :: boolean(),
                                   proactive_cids :: boolean(),
                                   socket_connected :: boolean(),
                                   self_migration_pending :: boolean(),
                                   metrics_counters :: nquic_metrics:conn_counters() | undefined,
                                   spin_enabled :: boolean(),
                                   peer_spin :: 0..1,
                                   new_token_enabled :: boolean(),
                                   new_token_lifetime :: pos_integer(),
                                   qlog :: undefined | nquic_qlog:qlog_state(),
                                   close_kind ::
                                       undefined | local | peer | idle_timeout | protocol_error,
                                   crypto ::
                                       #conn_crypto{tls_state :: term(),
                                                    keys :: #{nquic_packet:space() | rtt0 => map()},
                                                    app_send_keys :: map() | undefined,
                                                    app_recv_keys :: map() | undefined,
                                                    crypto_buffer ::
                                                        #{nquic_packet:space() =>
                                                              {non_neg_integer(), binary(), list()}},
                                                    cipher ::
                                                        aes_128_gcm | aes_256_gcm | chacha20_poly1305,
                                                    cipher_suites ::
                                                        [aes_128_gcm | aes_256_gcm | chacha20_poly1305] |
                                                        undefined,
                                                    key_phase :: boolean(),
                                                    key_update_pending :: boolean(),
                                                    client_app_secret :: binary() | undefined,
                                                    server_app_secret :: binary() | undefined,
                                                    old_read_keys ::
                                                        #{key := binary(), iv := binary()} | undefined,
                                                    zero_rtt_accepted :: boolean(),
                                                    replay_protection :: module() | undefined,
                                                    session_ticket :: map() | undefined,
                                                    resumption_secret :: binary() | undefined,
                                                    session_cache ::
                                                        atom() | false | {module, module()} | undefined,
                                                    token_cache :: atom() | false | {module, module()},
                                                    alpn :: [binary()] | undefined,
                                                    hostname :: string() | binary() | undefined,
                                                    cert :: binary() | undefined,
                                                    cert_chain :: [binary()],
                                                    key :: any() | undefined,
                                                    verify :: verify_none | verify_peer,
                                                    cacerts :: [binary()],
                                                    peer_cert :: binary() | undefined,
                                                    static_key :: binary() | undefined},
                                   streams_state ::
                                       #conn_streams{streams ::
                                                         #{nquic:stream_id() =>
                                                               #stream_state{stream_id ::
                                                                                 nquic:stream_id(),
                                                                             type :: bidi | uni,
                                                                             send_state ::
                                                                                 ready | send |
                                                                                 data_sent |
                                                                                 data_recvd |
                                                                                 reset_sent |
                                                                                 reset_recvd,
                                                                             send_offset ::
                                                                                 non_neg_integer(),
                                                                             send_max_data ::
                                                                                 non_neg_integer(),
                                                                             last_stream_data_blocked ::
                                                                                 non_neg_integer(),
                                                                             pending_send_data ::
                                                                                 [binary()],
                                                                             pending_send_size ::
                                                                                 non_neg_integer(),
                                                                             pending_send_fin ::
                                                                                 boolean(),
                                                                             recv_state ::
                                                                                 recv | size_known |
                                                                                 data_recvd |
                                                                                 reset_recvd |
                                                                                 data_read | reset_read,
                                                                             recv_offset ::
                                                                                 non_neg_integer(),
                                                                             recv_max_offset ::
                                                                                 non_neg_integer(),
                                                                             recv_window ::
                                                                                 non_neg_integer(),
                                                                             recv_buffer ::
                                                                                 gb_trees:tree(non_neg_integer(),
                                                                                               {binary(),
                                                                                                boolean()}),
                                                                             app_buffer :: iodata(),
                                                                             app_buffer_size ::
                                                                                 non_neg_integer()}},
                                                     next_bidi_stream :: nquic:stream_id() | undefined,
                                                     next_uni_stream :: nquic:stream_id() | undefined,
                                                     peer_max_streams_bidi :: non_neg_integer(),
                                                     peer_max_streams_uni :: non_neg_integer(),
                                                     local_max_streams_bidi :: non_neg_integer(),
                                                     local_max_streams_uni :: non_neg_integer(),
                                                     last_sent_max_streams_bidi :: non_neg_integer(),
                                                     last_sent_max_streams_uni :: non_neg_integer(),
                                                     max_peer_bidi_stream_id ::
                                                         non_neg_integer() | undefined,
                                                     max_peer_uni_stream_id ::
                                                         non_neg_integer() | undefined,
                                                     opened_peer_bidi_count :: non_neg_integer(),
                                                     opened_peer_uni_count :: non_neg_integer(),
                                                     closed_peer_bidi_wm :: integer(),
                                                     closed_peer_uni_wm :: integer(),
                                                     closed_peer_streams :: #{nquic:stream_id() => true},
                                                     recv_waiters ::
                                                         #{nquic:stream_id() => gen_statem:from()},
                                                     accept_stream_waiters ::
                                                         queue:queue(gen_statem:from()),
                                                     pending_streams :: queue:queue(nquic:stream_id()),
                                                     blocked_streams :: #{nquic:stream_id() => true},
                                                     pending_send_streams ::
                                                         #{nquic:stream_id() => true},
                                                     send_buffer_high_water :: pos_integer(),
                                                     send_timeout :: timeout(),
                                                     send_waiters ::
                                                         queue:queue(nquic_conn_send_waiters:t())},
                                   flow ::
                                       #conn_flow{local_max_data :: non_neg_integer(),
                                                  remote_max_data :: non_neg_integer(),
                                                  data_sent :: non_neg_integer(),
                                                  data_received :: non_neg_integer(),
                                                  last_data_blocked :: non_neg_integer(),
                                                  pending_initial_frames :: [nquic_frame:t()],
                                                  pending_handshake_frames :: [nquic_frame:t()],
                                                  pending_app_frames :: [nquic_frame:t()],
                                                  pending_app_pre_encoded ::
                                                      [{non_neg_integer(), iodata(), nquic_frame:t()}],
                                                  queued_app_send_bytes :: non_neg_integer()},
                                   path ::
                                       #conn_path_mgmt{path_state :: nquic_path:state() | undefined,
                                                       peer_cids ::
                                                           #{non_neg_integer() =>
                                                                 #{cid := nquic:connection_id(),
                                                                   token := binary()}},
                                                       local_cids ::
                                                           #{non_neg_integer() => nquic:connection_id()},
                                                       local_cid_seq :: non_neg_integer(),
                                                       peer_retire_prior_to :: non_neg_integer(),
                                                       anti_amp_bytes_received :: non_neg_integer(),
                                                       anti_amp_bytes_sent :: non_neg_integer(),
                                                       address_validated :: boolean()}}) ->
                          gen_statem:event_handler_result(nquic_conn_statem:state_name()).

Sync send_stream / send_fin entry point, parking with the connection-level send_timeout.

Queues as much of DataBin as fits under the per-stream send_buffer_high_water; if a tail is left over, parks the caller as a t() that wake/1 (or the send_wait_timeout timer) eventually resolves.

handle_sync_send(From, StreamID, DataBin, Fin, Timeout, Data)

-spec handle_sync_send(gen_statem:from(),
                       nquic:stream_id(),
                       iodata(),
                       fin | nofin,
                       timeout(),
                       #conn_state{role :: client | server,
                                   scid :: nquic:connection_id(),
                                   dcid :: nquic:connection_id(),
                                   odcid :: nquic:connection_id() | undefined,
                                   retry_scid :: nquic:connection_id() | undefined,
                                   retry_token :: binary(),
                                   version :: non_neg_integer(),
                                   version_preference :: [non_neg_integer()],
                                   socket :: nquic_socket:t() | undefined,
                                   peer :: nquic_socket:sockaddr() | undefined,
                                   select_info :: nquic_socket:select_info() | undefined,
                                   pn_spaces :: #{nquic_packet:space() => map()},
                                   app_next_pn :: non_neg_integer(),
                                   app_largest_received :: integer(),
                                   loss_state :: nquic_loss:loss_state() | undefined,
                                   dispatch_table :: nquic_dispatch:t() | undefined,
                                   listener :: pid() | undefined,
                                   connect_waiters :: [gen_server:from()],
                                   local_params ::
                                       #transport_params{original_destination_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         max_idle_timeout :: non_neg_integer(),
                                                         stateless_reset_token :: binary() | undefined,
                                                         max_udp_payload_size :: pos_integer(),
                                                         initial_max_data :: non_neg_integer(),
                                                         initial_max_stream_data_bidi_local ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_bidi_remote ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_uni ::
                                                             non_neg_integer(),
                                                         initial_max_streams_bidi :: non_neg_integer(),
                                                         initial_max_streams_uni :: non_neg_integer(),
                                                         ack_delay_exponent :: 0..20,
                                                         max_ack_delay :: non_neg_integer(),
                                                         disable_active_migration :: boolean(),
                                                         preferred_address ::
                                                             nquic_transport:preferred_address() |
                                                             undefined,
                                                         active_connection_id_limit :: non_neg_integer(),
                                                         initial_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         retry_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         version_information ::
                                                             nquic_transport:version_information() |
                                                             undefined,
                                                         max_datagram_frame_size ::
                                                             non_neg_integer() | undefined},
                                   remote_params ::
                                       #transport_params{original_destination_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         max_idle_timeout :: non_neg_integer(),
                                                         stateless_reset_token :: binary() | undefined,
                                                         max_udp_payload_size :: pos_integer(),
                                                         initial_max_data :: non_neg_integer(),
                                                         initial_max_stream_data_bidi_local ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_bidi_remote ::
                                                             non_neg_integer(),
                                                         initial_max_stream_data_uni ::
                                                             non_neg_integer(),
                                                         initial_max_streams_bidi :: non_neg_integer(),
                                                         initial_max_streams_uni :: non_neg_integer(),
                                                         ack_delay_exponent :: 0..20,
                                                         max_ack_delay :: non_neg_integer(),
                                                         disable_active_migration :: boolean(),
                                                         preferred_address ::
                                                             nquic_transport:preferred_address() |
                                                             undefined,
                                                         active_connection_id_limit :: non_neg_integer(),
                                                         initial_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         retry_source_connection_id ::
                                                             nquic:connection_id() | undefined,
                                                         version_information ::
                                                             nquic_transport:version_information() |
                                                             undefined,
                                                         max_datagram_frame_size ::
                                                             non_neg_integer() | undefined} |
                                       undefined,
                                   server_packet_processed :: boolean(),
                                   owner :: pid() | undefined,
                                   owner_mon :: reference() | undefined,
                                   deferred_flush_pending :: boolean(),
                                   pending_ack_count :: non_neg_integer(),
                                   last_idle_ms :: non_neg_integer() | infinity | undefined,
                                   last_pto_ms :: non_neg_integer() | cancel | undefined,
                                   recv_ecn :: nquic_socket:ecn_mark(),
                                   pmtud :: nquic_pmtud:pmtud_state() | undefined,
                                   gso_size :: undefined | pos_integer(),
                                   max_payload_size :: pos_integer(),
                                   server_per_conn_fd :: boolean(),
                                   proactive_cids :: boolean(),
                                   socket_connected :: boolean(),
                                   self_migration_pending :: boolean(),
                                   metrics_counters :: nquic_metrics:conn_counters() | undefined,
                                   spin_enabled :: boolean(),
                                   peer_spin :: 0..1,
                                   new_token_enabled :: boolean(),
                                   new_token_lifetime :: pos_integer(),
                                   qlog :: undefined | nquic_qlog:qlog_state(),
                                   close_kind ::
                                       undefined | local | peer | idle_timeout | protocol_error,
                                   crypto ::
                                       #conn_crypto{tls_state :: term(),
                                                    keys :: #{nquic_packet:space() | rtt0 => map()},
                                                    app_send_keys :: map() | undefined,
                                                    app_recv_keys :: map() | undefined,
                                                    crypto_buffer ::
                                                        #{nquic_packet:space() =>
                                                              {non_neg_integer(), binary(), list()}},
                                                    cipher ::
                                                        aes_128_gcm | aes_256_gcm | chacha20_poly1305,
                                                    cipher_suites ::
                                                        [aes_128_gcm | aes_256_gcm | chacha20_poly1305] |
                                                        undefined,
                                                    key_phase :: boolean(),
                                                    key_update_pending :: boolean(),
                                                    client_app_secret :: binary() | undefined,
                                                    server_app_secret :: binary() | undefined,
                                                    old_read_keys ::
                                                        #{key := binary(), iv := binary()} | undefined,
                                                    zero_rtt_accepted :: boolean(),
                                                    replay_protection :: module() | undefined,
                                                    session_ticket :: map() | undefined,
                                                    resumption_secret :: binary() | undefined,
                                                    session_cache ::
                                                        atom() | false | {module, module()} | undefined,
                                                    token_cache :: atom() | false | {module, module()},
                                                    alpn :: [binary()] | undefined,
                                                    hostname :: string() | binary() | undefined,
                                                    cert :: binary() | undefined,
                                                    cert_chain :: [binary()],
                                                    key :: any() | undefined,
                                                    verify :: verify_none | verify_peer,
                                                    cacerts :: [binary()],
                                                    peer_cert :: binary() | undefined,
                                                    static_key :: binary() | undefined},
                                   streams_state ::
                                       #conn_streams{streams ::
                                                         #{nquic:stream_id() =>
                                                               #stream_state{stream_id ::
                                                                                 nquic:stream_id(),
                                                                             type :: bidi | uni,
                                                                             send_state ::
                                                                                 ready | send |
                                                                                 data_sent |
                                                                                 data_recvd |
                                                                                 reset_sent |
                                                                                 reset_recvd,
                                                                             send_offset ::
                                                                                 non_neg_integer(),
                                                                             send_max_data ::
                                                                                 non_neg_integer(),
                                                                             last_stream_data_blocked ::
                                                                                 non_neg_integer(),
                                                                             pending_send_data ::
                                                                                 [binary()],
                                                                             pending_send_size ::
                                                                                 non_neg_integer(),
                                                                             pending_send_fin ::
                                                                                 boolean(),
                                                                             recv_state ::
                                                                                 recv | size_known |
                                                                                 data_recvd |
                                                                                 reset_recvd |
                                                                                 data_read | reset_read,
                                                                             recv_offset ::
                                                                                 non_neg_integer(),
                                                                             recv_max_offset ::
                                                                                 non_neg_integer(),
                                                                             recv_window ::
                                                                                 non_neg_integer(),
                                                                             recv_buffer ::
                                                                                 gb_trees:tree(non_neg_integer(),
                                                                                               {binary(),
                                                                                                boolean()}),
                                                                             app_buffer :: iodata(),
                                                                             app_buffer_size ::
                                                                                 non_neg_integer()}},
                                                     next_bidi_stream :: nquic:stream_id() | undefined,
                                                     next_uni_stream :: nquic:stream_id() | undefined,
                                                     peer_max_streams_bidi :: non_neg_integer(),
                                                     peer_max_streams_uni :: non_neg_integer(),
                                                     local_max_streams_bidi :: non_neg_integer(),
                                                     local_max_streams_uni :: non_neg_integer(),
                                                     last_sent_max_streams_bidi :: non_neg_integer(),
                                                     last_sent_max_streams_uni :: non_neg_integer(),
                                                     max_peer_bidi_stream_id ::
                                                         non_neg_integer() | undefined,
                                                     max_peer_uni_stream_id ::
                                                         non_neg_integer() | undefined,
                                                     opened_peer_bidi_count :: non_neg_integer(),
                                                     opened_peer_uni_count :: non_neg_integer(),
                                                     closed_peer_bidi_wm :: integer(),
                                                     closed_peer_uni_wm :: integer(),
                                                     closed_peer_streams :: #{nquic:stream_id() => true},
                                                     recv_waiters ::
                                                         #{nquic:stream_id() => gen_statem:from()},
                                                     accept_stream_waiters ::
                                                         queue:queue(gen_statem:from()),
                                                     pending_streams :: queue:queue(nquic:stream_id()),
                                                     blocked_streams :: #{nquic:stream_id() => true},
                                                     pending_send_streams ::
                                                         #{nquic:stream_id() => true},
                                                     send_buffer_high_water :: pos_integer(),
                                                     send_timeout :: timeout(),
                                                     send_waiters ::
                                                         queue:queue(nquic_conn_send_waiters:t())},
                                   flow ::
                                       #conn_flow{local_max_data :: non_neg_integer(),
                                                  remote_max_data :: non_neg_integer(),
                                                  data_sent :: non_neg_integer(),
                                                  data_received :: non_neg_integer(),
                                                  last_data_blocked :: non_neg_integer(),
                                                  pending_initial_frames :: [nquic_frame:t()],
                                                  pending_handshake_frames :: [nquic_frame:t()],
                                                  pending_app_frames :: [nquic_frame:t()],
                                                  pending_app_pre_encoded ::
                                                      [{non_neg_integer(), iodata(), nquic_frame:t()}],
                                                  queued_app_send_bytes :: non_neg_integer()},
                                   path ::
                                       #conn_path_mgmt{path_state :: nquic_path:state() | undefined,
                                                       peer_cids ::
                                                           #{non_neg_integer() =>
                                                                 #{cid := nquic:connection_id(),
                                                                   token := binary()}},
                                                       local_cids ::
                                                           #{non_neg_integer() => nquic:connection_id()},
                                                       local_cid_seq :: non_neg_integer(),
                                                       peer_retire_prior_to :: non_neg_integer(),
                                                       anti_amp_bytes_received :: non_neg_integer(),
                                                       anti_amp_bytes_sent :: non_neg_integer(),
                                                       address_validated :: boolean()}}) ->
                          gen_statem:event_handler_result(nquic_conn_statem:state_name()).

Same as handle_sync_send/5 but with an explicit per-call Timeout for the parked waiter, overriding the connection-level send_timeout.

pop(From, TimerRef, Q)

-spec pop(gen_statem:from(), reference(), queue:queue(tuple())) ->
             {ok, queue:queue(tuple())} | not_found.

Pop the waiter whose From and timer ref both match. The ref check guards against a stale {timeout, _, _} message that was already in flight when we asynchronously cancelled the timer (e.g. the waiter was just woken by wake/1).

wake(Data)

-spec wake(#conn_state{role :: client | server,
                       scid :: nquic:connection_id(),
                       dcid :: nquic:connection_id(),
                       odcid :: nquic:connection_id() | undefined,
                       retry_scid :: nquic:connection_id() | undefined,
                       retry_token :: binary(),
                       version :: non_neg_integer(),
                       version_preference :: [non_neg_integer()],
                       socket :: nquic_socket:t() | undefined,
                       peer :: nquic_socket:sockaddr() | undefined,
                       select_info :: nquic_socket:select_info() | undefined,
                       pn_spaces :: #{nquic_packet:space() => map()},
                       app_next_pn :: non_neg_integer(),
                       app_largest_received :: integer(),
                       loss_state :: nquic_loss:loss_state() | undefined,
                       dispatch_table :: nquic_dispatch:t() | undefined,
                       listener :: pid() | undefined,
                       connect_waiters :: [gen_server:from()],
                       local_params ::
                           #transport_params{original_destination_connection_id ::
                                                 nquic:connection_id() | undefined,
                                             max_idle_timeout :: non_neg_integer(),
                                             stateless_reset_token :: binary() | undefined,
                                             max_udp_payload_size :: pos_integer(),
                                             initial_max_data :: non_neg_integer(),
                                             initial_max_stream_data_bidi_local :: non_neg_integer(),
                                             initial_max_stream_data_bidi_remote :: non_neg_integer(),
                                             initial_max_stream_data_uni :: non_neg_integer(),
                                             initial_max_streams_bidi :: non_neg_integer(),
                                             initial_max_streams_uni :: non_neg_integer(),
                                             ack_delay_exponent :: 0..20,
                                             max_ack_delay :: non_neg_integer(),
                                             disable_active_migration :: boolean(),
                                             preferred_address ::
                                                 nquic_transport:preferred_address() | undefined,
                                             active_connection_id_limit :: non_neg_integer(),
                                             initial_source_connection_id ::
                                                 nquic:connection_id() | undefined,
                                             retry_source_connection_id ::
                                                 nquic:connection_id() | undefined,
                                             version_information ::
                                                 nquic_transport:version_information() | undefined,
                                             max_datagram_frame_size :: non_neg_integer() | undefined},
                       remote_params ::
                           #transport_params{original_destination_connection_id ::
                                                 nquic:connection_id() | undefined,
                                             max_idle_timeout :: non_neg_integer(),
                                             stateless_reset_token :: binary() | undefined,
                                             max_udp_payload_size :: pos_integer(),
                                             initial_max_data :: non_neg_integer(),
                                             initial_max_stream_data_bidi_local :: non_neg_integer(),
                                             initial_max_stream_data_bidi_remote :: non_neg_integer(),
                                             initial_max_stream_data_uni :: non_neg_integer(),
                                             initial_max_streams_bidi :: non_neg_integer(),
                                             initial_max_streams_uni :: non_neg_integer(),
                                             ack_delay_exponent :: 0..20,
                                             max_ack_delay :: non_neg_integer(),
                                             disable_active_migration :: boolean(),
                                             preferred_address ::
                                                 nquic_transport:preferred_address() | undefined,
                                             active_connection_id_limit :: non_neg_integer(),
                                             initial_source_connection_id ::
                                                 nquic:connection_id() | undefined,
                                             retry_source_connection_id ::
                                                 nquic:connection_id() | undefined,
                                             version_information ::
                                                 nquic_transport:version_information() | undefined,
                                             max_datagram_frame_size :: non_neg_integer() | undefined} |
                           undefined,
                       server_packet_processed :: boolean(),
                       owner :: pid() | undefined,
                       owner_mon :: reference() | undefined,
                       deferred_flush_pending :: boolean(),
                       pending_ack_count :: non_neg_integer(),
                       last_idle_ms :: non_neg_integer() | infinity | undefined,
                       last_pto_ms :: non_neg_integer() | cancel | undefined,
                       recv_ecn :: nquic_socket:ecn_mark(),
                       pmtud :: nquic_pmtud:pmtud_state() | undefined,
                       gso_size :: undefined | pos_integer(),
                       max_payload_size :: pos_integer(),
                       server_per_conn_fd :: boolean(),
                       proactive_cids :: boolean(),
                       socket_connected :: boolean(),
                       self_migration_pending :: boolean(),
                       metrics_counters :: nquic_metrics:conn_counters() | undefined,
                       spin_enabled :: boolean(),
                       peer_spin :: 0..1,
                       new_token_enabled :: boolean(),
                       new_token_lifetime :: pos_integer(),
                       qlog :: undefined | nquic_qlog:qlog_state(),
                       close_kind :: undefined | local | peer | idle_timeout | protocol_error,
                       crypto ::
                           #conn_crypto{tls_state :: term(),
                                        keys :: #{nquic_packet:space() | rtt0 => map()},
                                        app_send_keys :: map() | undefined,
                                        app_recv_keys :: map() | undefined,
                                        crypto_buffer ::
                                            #{nquic_packet:space() =>
                                                  {non_neg_integer(), binary(), list()}},
                                        cipher :: aes_128_gcm | aes_256_gcm | chacha20_poly1305,
                                        cipher_suites ::
                                            [aes_128_gcm | aes_256_gcm | chacha20_poly1305] | undefined,
                                        key_phase :: boolean(),
                                        key_update_pending :: boolean(),
                                        client_app_secret :: binary() | undefined,
                                        server_app_secret :: binary() | undefined,
                                        old_read_keys :: #{key := binary(), iv := binary()} | undefined,
                                        zero_rtt_accepted :: boolean(),
                                        replay_protection :: module() | undefined,
                                        session_ticket :: map() | undefined,
                                        resumption_secret :: binary() | undefined,
                                        session_cache :: atom() | false | {module, module()} | undefined,
                                        token_cache :: atom() | false | {module, module()},
                                        alpn :: [binary()] | undefined,
                                        hostname :: string() | binary() | undefined,
                                        cert :: binary() | undefined,
                                        cert_chain :: [binary()],
                                        key :: any() | undefined,
                                        verify :: verify_none | verify_peer,
                                        cacerts :: [binary()],
                                        peer_cert :: binary() | undefined,
                                        static_key :: binary() | undefined},
                       streams_state ::
                           #conn_streams{streams ::
                                             #{nquic:stream_id() =>
                                                   #stream_state{stream_id :: nquic:stream_id(),
                                                                 type :: bidi | uni,
                                                                 send_state ::
                                                                     ready | send | data_sent |
                                                                     data_recvd | reset_sent |
                                                                     reset_recvd,
                                                                 send_offset :: non_neg_integer(),
                                                                 send_max_data :: non_neg_integer(),
                                                                 last_stream_data_blocked ::
                                                                     non_neg_integer(),
                                                                 pending_send_data :: [binary()],
                                                                 pending_send_size :: non_neg_integer(),
                                                                 pending_send_fin :: boolean(),
                                                                 recv_state ::
                                                                     recv | size_known | data_recvd |
                                                                     reset_recvd | data_read |
                                                                     reset_read,
                                                                 recv_offset :: non_neg_integer(),
                                                                 recv_max_offset :: non_neg_integer(),
                                                                 recv_window :: non_neg_integer(),
                                                                 recv_buffer ::
                                                                     gb_trees:tree(non_neg_integer(),
                                                                                   {binary(), boolean()}),
                                                                 app_buffer :: iodata(),
                                                                 app_buffer_size :: non_neg_integer()}},
                                         next_bidi_stream :: nquic:stream_id() | undefined,
                                         next_uni_stream :: nquic:stream_id() | undefined,
                                         peer_max_streams_bidi :: non_neg_integer(),
                                         peer_max_streams_uni :: non_neg_integer(),
                                         local_max_streams_bidi :: non_neg_integer(),
                                         local_max_streams_uni :: non_neg_integer(),
                                         last_sent_max_streams_bidi :: non_neg_integer(),
                                         last_sent_max_streams_uni :: non_neg_integer(),
                                         max_peer_bidi_stream_id :: non_neg_integer() | undefined,
                                         max_peer_uni_stream_id :: non_neg_integer() | undefined,
                                         opened_peer_bidi_count :: non_neg_integer(),
                                         opened_peer_uni_count :: non_neg_integer(),
                                         closed_peer_bidi_wm :: integer(),
                                         closed_peer_uni_wm :: integer(),
                                         closed_peer_streams :: #{nquic:stream_id() => true},
                                         recv_waiters :: #{nquic:stream_id() => gen_statem:from()},
                                         accept_stream_waiters :: queue:queue(gen_statem:from()),
                                         pending_streams :: queue:queue(nquic:stream_id()),
                                         blocked_streams :: #{nquic:stream_id() => true},
                                         pending_send_streams :: #{nquic:stream_id() => true},
                                         send_buffer_high_water :: pos_integer(),
                                         send_timeout :: timeout(),
                                         send_waiters :: queue:queue(nquic_conn_send_waiters:t())},
                       flow ::
                           #conn_flow{local_max_data :: non_neg_integer(),
                                      remote_max_data :: non_neg_integer(),
                                      data_sent :: non_neg_integer(),
                                      data_received :: non_neg_integer(),
                                      last_data_blocked :: non_neg_integer(),
                                      pending_initial_frames :: [nquic_frame:t()],
                                      pending_handshake_frames :: [nquic_frame:t()],
                                      pending_app_frames :: [nquic_frame:t()],
                                      pending_app_pre_encoded ::
                                          [{non_neg_integer(), iodata(), nquic_frame:t()}],
                                      queued_app_send_bytes :: non_neg_integer()},
                       path ::
                           #conn_path_mgmt{path_state :: nquic_path:state() | undefined,
                                           peer_cids ::
                                               #{non_neg_integer() =>
                                                     #{cid := nquic:connection_id(), token := binary()}},
                                           local_cids :: #{non_neg_integer() => nquic:connection_id()},
                                           local_cid_seq :: non_neg_integer(),
                                           peer_retire_prior_to :: non_neg_integer(),
                                           anti_amp_bytes_received :: non_neg_integer(),
                                           anti_amp_bytes_sent :: non_neg_integer(),
                                           address_validated :: boolean()}}) ->
              {#conn_state{role :: client | server,
                           scid :: nquic:connection_id(),
                           dcid :: nquic:connection_id(),
                           odcid :: nquic:connection_id() | undefined,
                           retry_scid :: nquic:connection_id() | undefined,
                           retry_token :: binary(),
                           version :: non_neg_integer(),
                           version_preference :: [non_neg_integer()],
                           socket :: nquic_socket:t() | undefined,
                           peer :: nquic_socket:sockaddr() | undefined,
                           select_info :: nquic_socket:select_info() | undefined,
                           pn_spaces :: #{nquic_packet:space() => map()},
                           app_next_pn :: non_neg_integer(),
                           app_largest_received :: integer(),
                           loss_state :: nquic_loss:loss_state() | undefined,
                           dispatch_table :: nquic_dispatch:t() | undefined,
                           listener :: pid() | undefined,
                           connect_waiters :: [gen_server:from()],
                           local_params ::
                               #transport_params{original_destination_connection_id ::
                                                     nquic:connection_id() | undefined,
                                                 max_idle_timeout :: non_neg_integer(),
                                                 stateless_reset_token :: binary() | undefined,
                                                 max_udp_payload_size :: pos_integer(),
                                                 initial_max_data :: non_neg_integer(),
                                                 initial_max_stream_data_bidi_local :: non_neg_integer(),
                                                 initial_max_stream_data_bidi_remote ::
                                                     non_neg_integer(),
                                                 initial_max_stream_data_uni :: non_neg_integer(),
                                                 initial_max_streams_bidi :: non_neg_integer(),
                                                 initial_max_streams_uni :: non_neg_integer(),
                                                 ack_delay_exponent :: 0..20,
                                                 max_ack_delay :: non_neg_integer(),
                                                 disable_active_migration :: boolean(),
                                                 preferred_address ::
                                                     nquic_transport:preferred_address() | undefined,
                                                 active_connection_id_limit :: non_neg_integer(),
                                                 initial_source_connection_id ::
                                                     nquic:connection_id() | undefined,
                                                 retry_source_connection_id ::
                                                     nquic:connection_id() | undefined,
                                                 version_information ::
                                                     nquic_transport:version_information() | undefined,
                                                 max_datagram_frame_size ::
                                                     non_neg_integer() | undefined},
                           remote_params ::
                               #transport_params{original_destination_connection_id ::
                                                     nquic:connection_id() | undefined,
                                                 max_idle_timeout :: non_neg_integer(),
                                                 stateless_reset_token :: binary() | undefined,
                                                 max_udp_payload_size :: pos_integer(),
                                                 initial_max_data :: non_neg_integer(),
                                                 initial_max_stream_data_bidi_local :: non_neg_integer(),
                                                 initial_max_stream_data_bidi_remote ::
                                                     non_neg_integer(),
                                                 initial_max_stream_data_uni :: non_neg_integer(),
                                                 initial_max_streams_bidi :: non_neg_integer(),
                                                 initial_max_streams_uni :: non_neg_integer(),
                                                 ack_delay_exponent :: 0..20,
                                                 max_ack_delay :: non_neg_integer(),
                                                 disable_active_migration :: boolean(),
                                                 preferred_address ::
                                                     nquic_transport:preferred_address() | undefined,
                                                 active_connection_id_limit :: non_neg_integer(),
                                                 initial_source_connection_id ::
                                                     nquic:connection_id() | undefined,
                                                 retry_source_connection_id ::
                                                     nquic:connection_id() | undefined,
                                                 version_information ::
                                                     nquic_transport:version_information() | undefined,
                                                 max_datagram_frame_size ::
                                                     non_neg_integer() | undefined} |
                               undefined,
                           server_packet_processed :: boolean(),
                           owner :: pid() | undefined,
                           owner_mon :: reference() | undefined,
                           deferred_flush_pending :: boolean(),
                           pending_ack_count :: non_neg_integer(),
                           last_idle_ms :: non_neg_integer() | infinity | undefined,
                           last_pto_ms :: non_neg_integer() | cancel | undefined,
                           recv_ecn :: nquic_socket:ecn_mark(),
                           pmtud :: nquic_pmtud:pmtud_state() | undefined,
                           gso_size :: undefined | pos_integer(),
                           max_payload_size :: pos_integer(),
                           server_per_conn_fd :: boolean(),
                           proactive_cids :: boolean(),
                           socket_connected :: boolean(),
                           self_migration_pending :: boolean(),
                           metrics_counters :: nquic_metrics:conn_counters() | undefined,
                           spin_enabled :: boolean(),
                           peer_spin :: 0..1,
                           new_token_enabled :: boolean(),
                           new_token_lifetime :: pos_integer(),
                           qlog :: undefined | nquic_qlog:qlog_state(),
                           close_kind :: undefined | local | peer | idle_timeout | protocol_error,
                           crypto ::
                               #conn_crypto{tls_state :: term(),
                                            keys :: #{nquic_packet:space() | rtt0 => map()},
                                            app_send_keys :: map() | undefined,
                                            app_recv_keys :: map() | undefined,
                                            crypto_buffer ::
                                                #{nquic_packet:space() =>
                                                      {non_neg_integer(), binary(), list()}},
                                            cipher :: aes_128_gcm | aes_256_gcm | chacha20_poly1305,
                                            cipher_suites ::
                                                [aes_128_gcm | aes_256_gcm | chacha20_poly1305] |
                                                undefined,
                                            key_phase :: boolean(),
                                            key_update_pending :: boolean(),
                                            client_app_secret :: binary() | undefined,
                                            server_app_secret :: binary() | undefined,
                                            old_read_keys ::
                                                #{key := binary(), iv := binary()} | undefined,
                                            zero_rtt_accepted :: boolean(),
                                            replay_protection :: module() | undefined,
                                            session_ticket :: map() | undefined,
                                            resumption_secret :: binary() | undefined,
                                            session_cache ::
                                                atom() | false | {module, module()} | undefined,
                                            token_cache :: atom() | false | {module, module()},
                                            alpn :: [binary()] | undefined,
                                            hostname :: string() | binary() | undefined,
                                            cert :: binary() | undefined,
                                            cert_chain :: [binary()],
                                            key :: any() | undefined,
                                            verify :: verify_none | verify_peer,
                                            cacerts :: [binary()],
                                            peer_cert :: binary() | undefined,
                                            static_key :: binary() | undefined},
                           streams_state ::
                               #conn_streams{streams ::
                                                 #{nquic:stream_id() =>
                                                       #stream_state{stream_id :: nquic:stream_id(),
                                                                     type :: bidi | uni,
                                                                     send_state ::
                                                                         ready | send | data_sent |
                                                                         data_recvd | reset_sent |
                                                                         reset_recvd,
                                                                     send_offset :: non_neg_integer(),
                                                                     send_max_data :: non_neg_integer(),
                                                                     last_stream_data_blocked ::
                                                                         non_neg_integer(),
                                                                     pending_send_data :: [binary()],
                                                                     pending_send_size ::
                                                                         non_neg_integer(),
                                                                     pending_send_fin :: boolean(),
                                                                     recv_state ::
                                                                         recv | size_known |
                                                                         data_recvd | reset_recvd |
                                                                         data_read | reset_read,
                                                                     recv_offset :: non_neg_integer(),
                                                                     recv_max_offset ::
                                                                         non_neg_integer(),
                                                                     recv_window :: non_neg_integer(),
                                                                     recv_buffer ::
                                                                         gb_trees:tree(non_neg_integer(),
                                                                                       {binary(),
                                                                                        boolean()}),
                                                                     app_buffer :: iodata(),
                                                                     app_buffer_size ::
                                                                         non_neg_integer()}},
                                             next_bidi_stream :: nquic:stream_id() | undefined,
                                             next_uni_stream :: nquic:stream_id() | undefined,
                                             peer_max_streams_bidi :: non_neg_integer(),
                                             peer_max_streams_uni :: non_neg_integer(),
                                             local_max_streams_bidi :: non_neg_integer(),
                                             local_max_streams_uni :: non_neg_integer(),
                                             last_sent_max_streams_bidi :: non_neg_integer(),
                                             last_sent_max_streams_uni :: non_neg_integer(),
                                             max_peer_bidi_stream_id :: non_neg_integer() | undefined,
                                             max_peer_uni_stream_id :: non_neg_integer() | undefined,
                                             opened_peer_bidi_count :: non_neg_integer(),
                                             opened_peer_uni_count :: non_neg_integer(),
                                             closed_peer_bidi_wm :: integer(),
                                             closed_peer_uni_wm :: integer(),
                                             closed_peer_streams :: #{nquic:stream_id() => true},
                                             recv_waiters :: #{nquic:stream_id() => gen_statem:from()},
                                             accept_stream_waiters :: queue:queue(gen_statem:from()),
                                             pending_streams :: queue:queue(nquic:stream_id()),
                                             blocked_streams :: #{nquic:stream_id() => true},
                                             pending_send_streams :: #{nquic:stream_id() => true},
                                             send_buffer_high_water :: pos_integer(),
                                             send_timeout :: timeout(),
                                             send_waiters :: queue:queue(nquic_conn_send_waiters:t())},
                           flow ::
                               #conn_flow{local_max_data :: non_neg_integer(),
                                          remote_max_data :: non_neg_integer(),
                                          data_sent :: non_neg_integer(),
                                          data_received :: non_neg_integer(),
                                          last_data_blocked :: non_neg_integer(),
                                          pending_initial_frames :: [nquic_frame:t()],
                                          pending_handshake_frames :: [nquic_frame:t()],
                                          pending_app_frames :: [nquic_frame:t()],
                                          pending_app_pre_encoded ::
                                              [{non_neg_integer(), iodata(), nquic_frame:t()}],
                                          queued_app_send_bytes :: non_neg_integer()},
                           path ::
                               #conn_path_mgmt{path_state :: nquic_path:state() | undefined,
                                               peer_cids ::
                                                   #{non_neg_integer() =>
                                                         #{cid := nquic:connection_id(),
                                                           token := binary()}},
                                               local_cids ::
                                                   #{non_neg_integer() => nquic:connection_id()},
                                               local_cid_seq :: non_neg_integer(),
                                               peer_retire_prior_to :: non_neg_integer(),
                                               anti_amp_bytes_received :: non_neg_integer(),
                                               anti_amp_bytes_sent :: non_neg_integer(),
                                               address_validated :: boolean()}},
               [gen_statem:action()]}.

Walk the parked waiters in FIFO order and try to queue more of each waiter's bytes onto its stream. Returns {NewData, ReplyActions}; the caller (typically the post-flush path) prepends the reply actions to the gen_statem actions list.