Huginn.Clickhouse.Stream (Huginn v0.4.0)

View Source

Streaming utilities for ClickHouse gRPC operations.

Provides helpers for:

  • Building input streams for ExecuteQueryWithStreamInput
  • Processing output streams from ExecuteQueryWithStreamOutput
  • Bidirectional streaming with ExecuteQueryWithStreamIO

Summary

Functions

Collects all results from an output stream into a single Result.

Creates an input stream from an enumerable of data chunks.

Streams rows as maps from an output stream.

Creates a stream that yields parsed Result structs from gRPC results.

Streams rows from an output stream, yielding individual rows.

Transforms an output stream, applying a function to each Result chunk.

Functions

collect_output(stream)

@spec collect_output(Enumerable.t()) ::
  {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}

Collects all results from an output stream into a single Result.

input_stream(sql, data_enum, opts \\ [])

@spec input_stream(String.t(), Enumerable.t(), keyword()) :: Enumerable.t(struct())

Creates an input stream from an enumerable of data chunks.

The first QueryInfo contains the SQL query, subsequent ones contain data.

Options

  • :chunk_size - Number of rows per chunk (for lists)
  • :format - Input data format (default: "TabSeparated")
  • All options from Query.build/2

Examples

# Stream from a list of rows
rows = [["a", "1"], ["b", "2"], ["c", "3"]]
stream = Stream.input_stream("INSERT INTO t VALUES", rows, format: "TabSeparated")

# Stream from a file
File.stream!("data.csv")
|> Stream.input_stream("INSERT INTO t FORMAT CSV", format: "CSV")

map_stream(stream)

@spec map_stream(Enumerable.t()) :: Enumerable.t(map())

Streams rows as maps from an output stream.

parse_stream(stream)

@spec parse_stream(Enumerable.t()) ::
  Enumerable.t({:ok, Huginn.Clickhouse.Result.t()} | {:error, term()})

Creates a stream that yields parsed Result structs from gRPC results.

row_stream(stream)

@spec row_stream(Enumerable.t()) :: Enumerable.t(list())

Streams rows from an output stream, yielding individual rows.

Useful for processing large result sets row by row.

transform_output(stream, fun)

@spec transform_output(Enumerable.t(), (Huginn.Clickhouse.Result.t() -> term())) ::
  Enumerable.t()

Transforms an output stream, applying a function to each Result chunk.