HuggingfaceClient.Inference.StreamHelpers (huggingface_client v0.1.0)

Copy Markdown View Source

Utility functions for working with streaming inference responses.

Example: Collect a full response from a stream

{:ok, stream} = HuggingfaceClient.chat_completion_stream(client, %{
  model: "meta-llama/Llama-3.1-8B-Instruct",
  messages: [%{role: "user", content: "Write a haiku about Elixir"}]
})

text = HuggingfaceClient.Inference.StreamHelpers.collect_content(stream)
IO.puts(text)

Example: Stream to Phoenix LiveView

{:ok, stream} = HuggingfaceClient.chat_completion_stream(client, args)

HuggingfaceClient.Inference.StreamHelpers.each_content(stream, fn chunk ->
  send(self(), {:llm_chunk, chunk})
end)

Summary

Functions

Wraps a stream in a Task that accumulates the full response, resolving to {:ok, full_text} or {:error, exception}.

Collects all content delta tokens from a chat completion stream into a single string.

Collects the full chat completion output including metadata from a stream.

Converts a chat completion stream into a stream of plain content strings.

Calls callback for each non-empty content delta token in the stream.

Runs a stream in a background Task and sends each content token to pid as {tag, token} messages.

Functions

async_collect(stream, _ \\ :infinity)

@spec async_collect(Enumerable.t(), timeout()) :: Task.t()

Wraps a stream in a Task that accumulates the full response, resolving to {:ok, full_text} or {:error, exception}.

Example

task = HuggingfaceClient.Inference.StreamHelpers.async_collect(stream)

# Do other work here…

case Task.await(task, 30_000) do
  {:ok, text}  -> IO.puts(text)
  {:error, err} -> IO.inspect(err)
end

collect_content(stream)

@spec collect_content(Enumerable.t()) :: String.t()

Collects all content delta tokens from a chat completion stream into a single string.

Blocks until the stream is exhausted. For non-blocking use, run in a Task.

collect_response(stream)

@spec collect_response(Enumerable.t()) :: map()

Collects the full chat completion output including metadata from a stream.

Returns a map mimicking the non-streaming response shape: %{"choices" => [%{"message" => %{"role" => "assistant", "content" => text}}], ...}

Useful when you want to stream visually but still get a complete response object.

content_stream(stream)

@spec content_stream(Enumerable.t()) :: Enumerable.t()

Converts a chat completion stream into a stream of plain content strings.

Filters out empty/nil deltas automatically.

Example

{:ok, raw_stream} = HuggingfaceClient.chat_completion_stream(client, args)

text_stream = HuggingfaceClient.Inference.StreamHelpers.content_stream(raw_stream)
Enum.each(text_stream, &IO.write/1)

each_content(stream, callback)

@spec each_content(Enumerable.t(), (String.t() -> any())) :: :ok

Calls callback for each non-empty content delta token in the stream.

Returns :ok when the stream is exhausted.

stream_to_pid(stream, pid, tag \\ :hf_stream)

@spec stream_to_pid(Enumerable.t(), pid(), term()) :: Task.t()

Runs a stream in a background Task and sends each content token to pid as {tag, token} messages.

Sends {tag, :done} when the stream is exhausted, or {tag, {:error, reason}} if the stream raises.

Example (Phoenix LiveView)

def handle_event("ask", %{"prompt" => prompt}, socket) do
  {:ok, stream} = HuggingfaceClient.chat_completion_stream(@client, %{
    model: "meta-llama/Llama-3.1-8B-Instruct",
    messages: [%{role: "user", content: prompt}]
  })

  HuggingfaceClient.Inference.StreamHelpers.stream_to_pid(stream, self(), :llm)

  {:noreply, socket}
end

def handle_info({:llm, token}, socket) when is_binary(token) do
  {:noreply, update(socket, :response, &(&1 <> token))}
end

def handle_info({:llm, :done}, socket), do: {:noreply, socket}