PhoenixTestDatastar.Stream
(PhoenixTestDatastar v0.0.2)
Copy Markdown
Manages long-lived SSE stream connections for testing real-time Datastar features.
This module enables testing of handlers that enter receive loops (e.g., waiting for PubSub messages). It spawns the handler in a separate task and forwards SSE chunks to the test process.
Usage
session
|> PhoenixTestDatastar.Stream.open_stream("/ds/my_handler/listen")
|> PhoenixTestDatastar.Stream.await_events()
|> assert_signal("count", 1)
|> PhoenixTestDatastar.Stream.close_stream()
Summary
Functions
Waits for SSE events from an open stream and applies them to the session.
Closes an open SSE stream.
Opens a long-lived SSE stream connection.
Checks if a session has an open stream.
Functions
@spec await_events( %PhoenixTestDatastar.Session{ active_form: term(), conn: term(), csrf_token: term(), current_operation: term(), current_path: term(), raw_html: term(), signals: term(), stream_info: term(), visit_opts: term(), within: term() }, keyword() ) :: %PhoenixTestDatastar.Session{ active_form: term(), conn: term(), csrf_token: term(), current_operation: term(), current_path: term(), raw_html: term(), signals: term(), stream_info: term(), visit_opts: term(), within: term() }
Waits for SSE events from an open stream and applies them to the session.
Collects all available events within the timeout window.
Options
:timeout- Maximum time to wait for events in ms (default: 5000):count- Number of events to wait for (default: collect all available)
Examples
session
|> open_stream("/stream")
|> await_events() # Wait for any events
|> await_events(count: 2) # Wait for exactly 2 events
@spec close_stream(%PhoenixTestDatastar.Session{ active_form: term(), conn: term(), csrf_token: term(), current_operation: term(), current_path: term(), raw_html: term(), signals: term(), stream_info: term(), visit_opts: term(), within: term() }) :: %PhoenixTestDatastar.Session{ active_form: term(), conn: term(), csrf_token: term(), current_operation: term(), current_path: term(), raw_html: term(), signals: term(), stream_info: term(), visit_opts: term(), within: term() }
Closes an open SSE stream.
Shuts down the task running the handler.
Examples
session
|> open_stream("/stream")
|> await_events()
|> close_stream()
@spec open_stream( %PhoenixTestDatastar.Session{ active_form: term(), conn: term(), csrf_token: term(), current_operation: term(), current_path: term(), raw_html: term(), signals: term(), stream_info: term(), visit_opts: term(), within: term() }, String.t(), keyword() ) :: %PhoenixTestDatastar.Session{ active_form: term(), conn: term(), csrf_token: term(), current_operation: term(), current_path: term(), raw_html: term(), signals: term(), stream_info: term(), visit_opts: term(), within: term() }
Opens a long-lived SSE stream connection.
Spawns the handler in a task and sets up the session to receive SSE events via process messages.
Options
:method- HTTP method (default::get):timeout- Timeout for initial connection in ms (default: 5000)
Examples
session
|> open_stream("/ds/my_handler/listen")
|> await_events()
@spec stream_open?(%PhoenixTestDatastar.Session{ active_form: term(), conn: term(), csrf_token: term(), current_operation: term(), current_path: term(), raw_html: term(), signals: term(), stream_info: term(), visit_opts: term(), within: term() }) :: boolean()
Checks if a session has an open stream.