PhoenixTestDatastar.Stream (PhoenixTestDatastar v0.0.2)

Copy Markdown

Manages long-lived SSE stream connections for testing real-time Datastar features.

This module enables testing of handlers that enter receive loops (e.g., waiting for PubSub messages). It spawns the handler in a separate task and forwards SSE chunks to the test process.

Usage

session
|> PhoenixTestDatastar.Stream.open_stream("/ds/my_handler/listen")
|> PhoenixTestDatastar.Stream.await_events()
|> assert_signal("count", 1)
|> PhoenixTestDatastar.Stream.close_stream()

Summary

Functions

Waits for SSE events from an open stream and applies them to the session.

Closes an open SSE stream.

Opens a long-lived SSE stream connection.

Checks if a session has an open stream.

Functions

await_events(session, opts \\ [])

@spec await_events(
  %PhoenixTestDatastar.Session{
    active_form: term(),
    conn: term(),
    csrf_token: term(),
    current_operation: term(),
    current_path: term(),
    raw_html: term(),
    signals: term(),
    stream_info: term(),
    visit_opts: term(),
    within: term()
  },
  keyword()
) :: %PhoenixTestDatastar.Session{
  active_form: term(),
  conn: term(),
  csrf_token: term(),
  current_operation: term(),
  current_path: term(),
  raw_html: term(),
  signals: term(),
  stream_info: term(),
  visit_opts: term(),
  within: term()
}

Waits for SSE events from an open stream and applies them to the session.

Collects all available events within the timeout window.

Options

  • :timeout - Maximum time to wait for events in ms (default: 5000)
  • :count - Number of events to wait for (default: collect all available)

Examples

session
|> open_stream("/stream")
|> await_events()          # Wait for any events
|> await_events(count: 2)  # Wait for exactly 2 events

close_stream(session)

@spec close_stream(%PhoenixTestDatastar.Session{
  active_form: term(),
  conn: term(),
  csrf_token: term(),
  current_operation: term(),
  current_path: term(),
  raw_html: term(),
  signals: term(),
  stream_info: term(),
  visit_opts: term(),
  within: term()
}) :: %PhoenixTestDatastar.Session{
  active_form: term(),
  conn: term(),
  csrf_token: term(),
  current_operation: term(),
  current_path: term(),
  raw_html: term(),
  signals: term(),
  stream_info: term(),
  visit_opts: term(),
  within: term()
}

Closes an open SSE stream.

Shuts down the task running the handler.

Examples

session
|> open_stream("/stream")
|> await_events()
|> close_stream()

open_stream(session, path, opts \\ [])

@spec open_stream(
  %PhoenixTestDatastar.Session{
    active_form: term(),
    conn: term(),
    csrf_token: term(),
    current_operation: term(),
    current_path: term(),
    raw_html: term(),
    signals: term(),
    stream_info: term(),
    visit_opts: term(),
    within: term()
  },
  String.t(),
  keyword()
) :: %PhoenixTestDatastar.Session{
  active_form: term(),
  conn: term(),
  csrf_token: term(),
  current_operation: term(),
  current_path: term(),
  raw_html: term(),
  signals: term(),
  stream_info: term(),
  visit_opts: term(),
  within: term()
}

Opens a long-lived SSE stream connection.

Spawns the handler in a task and sets up the session to receive SSE events via process messages.

Options

  • :method - HTTP method (default: :get)
  • :timeout - Timeout for initial connection in ms (default: 5000)

Examples

session
|> open_stream("/ds/my_handler/listen")
|> await_events()

stream_open?(session)

@spec stream_open?(%PhoenixTestDatastar.Session{
  active_form: term(),
  conn: term(),
  csrf_token: term(),
  current_operation: term(),
  current_path: term(),
  raw_html: term(),
  signals: term(),
  stream_info: term(),
  visit_opts: term(),
  within: term()
}) :: boolean()

Checks if a session has an open stream.