Huginn.Clickhouse.Client (Huginn v0.4.0)

View Source

High-level ClickHouse gRPC client.

Provides a simple interface for executing queries against ClickHouse using gRPC with connection pooling. Supports all 4 ClickHouse gRPC methods:

gRPC Methods

MethodFunctionUse Case
ExecuteQueryquery/2, insert/3Simple queries and small inserts
ExecuteQueryWithStreamInputinsert_stream/3Large data inserts
ExecuteQueryWithStreamOutputstream_query/2Large result sets
ExecuteQueryWithStreamIOstream_io/1Bidirectional streaming

Examples

Simple Query (ExecuteQuery)

{:ok, result} = Huginn.Clickhouse.Client.query("SELECT 1")

Streaming Insert (ExecuteQueryWithStreamInput)

data_chunks = ["row1\tval1", "row2\tval2", "row3\tval3"]
{:ok, result} = Huginn.Clickhouse.Client.insert_stream(
  "INSERT INTO t FORMAT TabSeparated",
  data_chunks
)

Streaming Results (ExecuteQueryWithStreamOutput)

Huginn.Clickhouse.Client.stream_query("SELECT * FROM large_table")
|> Enum.each(fn {:ok, result} -> process(result) end)

Using Agent to Accumulate Results

{:ok, agent} = Agent.start_link(fn -> [] end)

Huginn.Clickhouse.Client.stream_query("SELECT * FROM events")
|> Enum.each(fn
  {:ok, result} ->
    Agent.update(agent, fn acc -> acc ++ result.rows end)
  {:error, _} ->
    :skip
end)

all_rows = Agent.get(agent, & &1)
Agent.stop(agent)

Stream.resource Pattern

defmodule MyApp.ClickHouseStream do
  def stream_events(query) do
    Stream.resource(
      fn -> start_query(query) end,
      &next_chunk/1,
      &cleanup/1
    )
  end

  defp start_query(query) do
    case Huginn.Clickhouse.Client.query(query) do
      {:ok, result} -> {:ok, result.rows}
      {:error, _} = err -> err
    end
  end

  defp next_chunk({:error, _} = err), do: {:halt, err}
  defp next_chunk({:ok, []}), do: {:halt, :done}
  defp next_chunk({:ok, [row | rest]}), do: {[row], {:ok, rest}}

  defp cleanup(_), do: :ok
end

Summary

Functions

Cancels a running query by its query ID.

Cancels all queries matching a pattern.

Inserts data using the ExecuteQuery gRPC method.

Inserts data using the ExecuteQueryWithStreamInput gRPC method.

Pings the ClickHouse server to check connectivity.

Executes a query using the ExecuteQuery gRPC method.

Executes a query and raises on error.

Lists currently running queries.

Opens a bidirectional streaming connection using ExecuteQueryWithStreamIO.

Streams rows as maps from a query result.

Executes a query using the ExecuteQueryWithStreamOutput gRPC method.

Streams individual rows from a query result.

Types

query_opts()

@type query_opts() :: [
  pool: atom(),
  database: String.t(),
  format: String.t(),
  settings: map(),
  timeout: non_neg_integer(),
  query_id: String.t(),
  retries: non_neg_integer(),
  retry_backoff: non_neg_integer()
]

Functions

cancel(query_id, opts \\ [])

@spec cancel(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Cancels a running query by its query ID.

Uses ClickHouse's KILL QUERY command to stop execution.

Examples

# Start a long query with custom ID
query_id = "my-query-123"
Task.async(fn ->
  Huginn.Clickhouse.Client.query(
    "SELECT sleep(60)",
    query_id: query_id
  )
end)

# Cancel it
:ok = Huginn.Clickhouse.Client.cancel(query_id)

cancel_where(condition, opts \\ [])

@spec cancel_where(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Cancels all queries matching a pattern.

Trusted input only

condition is interpolated verbatim into a KILL QUERY WHERE statement. Never pass untrusted/user-supplied input — build the condition from trusted values only. To cancel a single known query id safely, use cancel/2.

Examples

# Cancel all queries from a specific user
Huginn.Clickhouse.Client.cancel_where("user = 'admin'")

# Cancel queries running longer than 60 seconds
Huginn.Clickhouse.Client.cancel_where("elapsed > 60")

insert(sql, data, opts \\ [])

@spec insert(String.t(), binary(), keyword()) ::
  {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}

Inserts data using the ExecuteQuery gRPC method.

Suitable for small to medium inserts where data fits in a single request. For large inserts, use insert_stream/3.

Options

  • :format - Input data format (default: "TabSeparated")
  • All options from query/2

Examples

# TabSeparated format
data = "john\t25\njane\t30"
{:ok, _} = Huginn.Clickhouse.Client.insert(
  "INSERT INTO users (name, age) FORMAT TabSeparated",
  data
)

# JSONEachRow format
data = ~s({"name":"john","age":25}\n{"name":"jane","age":30})
{:ok, _} = Huginn.Clickhouse.Client.insert(
  "INSERT INTO users FORMAT JSONEachRow",
  data
)

insert_stream(sql, data_stream, opts \\ [])

@spec insert_stream(String.t(), Enumerable.t(), keyword()) ::
  {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}

Inserts data using the ExecuteQueryWithStreamInput gRPC method.

This method streams data to ClickHouse in chunks, suitable for:

  • Large file imports
  • Continuous data ingestion
  • Memory-efficient bulk inserts

The data stream is sent as multiple QueryInfo messages where:

  • First message contains the SQL query and initial data
  • Subsequent messages contain only data chunks

Options

  • :format - Input data format (default: "TabSeparated")
  • :chunk_size - Bytes per chunk
  • All options from query/2

Examples

# Stream from a file
File.stream!("large_data.csv", [], 65_536)
|> Huginn.Clickhouse.Client.insert_stream("INSERT INTO logs FORMAT CSV")

# Stream from enumerable
data_chunks = ["row1\tval1\n", "row2\tval2\n", "row3\tval3\n"]
{:ok, _} = Huginn.Clickhouse.Client.insert_stream(
  "INSERT INTO t FORMAT TabSeparated",
  data_chunks
)

# With Agent to track progress
{:ok, counter} = Agent.start_link(fn -> 0 end)

large_data
|> Stream.chunk_every(1000)
|> Stream.map(fn chunk ->
  Agent.update(counter, &(&1 + length(chunk)))
  Enum.join(chunk, "\n")
end)
|> Huginn.Clickhouse.Client.insert_stream("INSERT INTO t FORMAT TabSeparated")

IO.puts("Inserted #{Agent.get(counter, & &1)} rows")

ping(opts \\ [])

@spec ping(keyword()) :: :ok | {:error, term()}

Pings the ClickHouse server to check connectivity.

Examples

case Huginn.Clickhouse.Client.ping() do
  :ok -> IO.puts("Connected!")
  {:error, reason} -> IO.puts("Failed: #{inspect(reason)}")
end

query(sql, opts \\ [])

@spec query(String.t(), query_opts()) ::
  {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}

Executes a query using the ExecuteQuery gRPC method.

This is a simple request/response pattern suitable for:

  • SELECT queries with reasonable result sizes
  • DDL statements (CREATE, ALTER, DROP)
  • Small INSERT statements

Options

  • :pool - Pool name (default: configured pool)
  • :database - Database to use
  • :format - Output format (default: "TabSeparated")
  • :settings - ClickHouse settings map
  • :timeout - Query timeout in milliseconds (default: 60000)
  • :query_id - Custom query ID for tracking/cancellation

Examples

# Simple query
{:ok, result} = Huginn.Clickhouse.Client.query("SELECT 1")

# With options
{:ok, result} = Huginn.Clickhouse.Client.query(
  "SELECT * FROM users",
  database: "mydb",
  format: "JSONEachRow",
  timeout: 30_000
)

# Get results as maps
maps = Huginn.Clickhouse.Result.to_maps(result)

query!(sql, opts \\ [])

Executes a query and raises on error.

See query/2 for options.

running_queries(opts \\ [])

@spec running_queries(keyword()) ::
  {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}

Lists currently running queries.

Examples

{:ok, result} = Huginn.Clickhouse.Client.running_queries()
queries = Huginn.Clickhouse.Result.to_maps(result)

Enum.each(queries, fn q ->
  IO.puts("#{q["query_id"]}: #{q["query"]}")
end)

stream_io(opts \\ [])

@spec stream_io(query_opts()) ::
  {Enumerable.t(), (struct() -> :ok)} | {:error, term()}

Opens a bidirectional streaming connection using ExecuteQueryWithStreamIO.

This is the most flexible but complex method, suitable for:

  • Interactive query sessions
  • Real-time data processing pipelines
  • Custom streaming protocols

Returns a tuple of {output_stream, send_function} where:

  • output_stream is an enumerable of results
  • send_function sends QueryInfo messages to the server

Examples

# Basic bidirectional streaming
{output, send} = Huginn.Clickhouse.Client.stream_io()

# Send a query
query_info = Huginn.Clickhouse.Query.build("SELECT 1")
send.(query_info)

# Read response
Enum.take(output, 1)

# Send cancellation
send.(Huginn.Clickhouse.Query.build_cancel())

Agent-based Bidirectional Example

defmodule MyApp.StreamProcessor do
  def process do
    {output, send} = Huginn.Clickhouse.Client.stream_io()

    # Start result collector
    {:ok, results} = Agent.start_link(fn -> [] end)

    # Spawn output reader
    reader = Task.async(fn ->
      Enum.each(output, fn
        {:ok, result} ->
          Agent.update(results, &[result | &1])
        {:error, _} ->
          :skip
      end)
    end)

    # Send queries
    send.(Huginn.Clickhouse.Query.build("SELECT 1"))
    send.(Huginn.Clickhouse.Query.build("SELECT 2"))

    # Wait and get results
    Task.await(reader)
    Agent.get(results, &Enum.reverse/1)
  end
end

stream_maps(sql, opts \\ [])

@spec stream_maps(String.t(), query_opts()) :: Enumerable.t(map())

Streams rows as maps from a query result.

Convenience wrapper that converts each row to a map with column names as keys.

Examples

Huginn.Clickhouse.Client.stream_maps("SELECT id, name, age FROM users")
|> Stream.filter(fn %{"age" => age} -> age > 18 end)
|> Enum.to_list()

stream_query(sql, opts \\ [])

@spec stream_query(String.t(), query_opts()) :: Enumerable.t()

Executes a query using the ExecuteQueryWithStreamOutput gRPC method.

Returns a lazy stream of results, suitable for:

  • Large result sets that don't fit in memory
  • Processing data as it arrives
  • Implementing pagination-like behavior

Options

Returns

Returns a Stream that yields {:ok, Result.t()} or {:error, term()} tuples.

Examples

# Basic streaming
Huginn.Clickhouse.Client.stream_query("SELECT * FROM large_table")
|> Enum.each(fn
  {:ok, result} -> process_chunk(result)
  {:error, error} -> Logger.error("Error: #{inspect(error)}")
end)

# Collect all results
results =
  Huginn.Clickhouse.Client.stream_query("SELECT * FROM events")
  |> Enum.reduce([], fn
    {:ok, result}, acc -> acc ++ result.rows
    {:error, _}, acc -> acc
  end)

# Using Stream.resource pattern with early termination
Huginn.Clickhouse.Client.stream_query("SELECT * FROM logs")
|> Stream.flat_map(fn {:ok, r} -> r.rows; _ -> [] end)
|> Stream.take_while(fn [ts | _] -> ts > cutoff_time end)
|> Enum.to_list()

# Agent accumulator pattern
{:ok, agent} = Agent.start_link(fn -> %{rows: [], errors: 0} end)

Huginn.Clickhouse.Client.stream_query("SELECT * FROM metrics")
|> Enum.each(fn
  {:ok, result} ->
    Agent.update(agent, fn state ->
      %{state | rows: state.rows ++ result.rows}
    end)
  {:error, _} ->
    Agent.update(agent, fn state ->
      %{state | errors: state.errors + 1}
    end)
end)

final_state = Agent.get(agent, & &1)

stream_rows(sql, opts \\ [])

@spec stream_rows(String.t(), query_opts()) :: Enumerable.t(list())

Streams individual rows from a query result.

Convenience wrapper around stream_query/2 that flattens results into rows.

Examples

Huginn.Clickhouse.Client.stream_rows("SELECT id, name FROM users")
|> Stream.take(100)
|> Enum.each(fn [id, name] -> IO.puts("#{id}: #{name}") end)