ClickHouse client for Elixir using gRPC with connection pooling.

Features

  • gRPC protocol for efficient binary communication
  • Connection pooling with health monitoring
  • Support for all 4 ClickHouse gRPC methods:
    • ExecuteQuery - simple request/response
    • ExecuteQueryWithStreamInput - streaming inserts
    • ExecuteQueryWithStreamOutput - streaming large results
    • ExecuteQueryWithStreamIO - bidirectional streaming
  • Both password and JWT authentication
  • Query cancellation support
  • Automatic result parsing
  • Telemetry instrumentation and an opt-in default logger
  • Opt-in retries for transient transport failures

Installation

Add huginn to your list of dependencies in mix.exs:

def deps do
  [
    {:huginn, "~> 0.3.0"}
  ]
end

Configuration

Configure the ClickHouse connection in your config/config.exs:

config :huginn, :clickhouse,
  host: "localhost",
  port: 9100,
  database: "default",
  auth: {:password, "default", ""},
  pool_size: 5

Configuration Options

OptionDefaultDescription
:host(required)ClickHouse server hostname
:port9100gRPC port
:database"default"Default database
:authnil{:password, user, pass} or {:jwt, token}
:pool_size5Number of connections
:sslfalseEnable SSL/TLS
:pool_name:clickhouse_poolPool name for multiple pools

Production Configuration

# config/prod.exs
config :huginn, :clickhouse,
  host: System.get_env("CLICKHOUSE_HOST"),
  port: String.to_integer(System.get_env("CLICKHOUSE_PORT", "9100")),
  database: System.get_env("CLICKHOUSE_DATABASE", "default"),
  auth: {:password, System.get_env("CLICKHOUSE_USER"), System.get_env("CLICKHOUSE_PASSWORD")},
  pool_size: 10,
  ssl: true

Usage

gRPC Methods Overview

gRPC MethodFunctionUse Case
ExecuteQueryquery/2, insert/3Simple queries, small inserts
ExecuteQueryWithStreamInputinsert_stream/3Large file imports
ExecuteQueryWithStreamOutputstream_query/2Large result sets
ExecuteQueryWithStreamIOstream_io/1Bidirectional streaming

Simple Queries (ExecuteQuery)

# Execute a query
{:ok, result} = Huginn.query("SELECT * FROM system.tables LIMIT 10")

# Get results as maps
maps = Huginn.Clickhouse.Result.to_maps(result)

# Query with options
{:ok, result} = Huginn.query(
  "SELECT * FROM users WHERE status = 'active'",
  database: "mydb",
  format: "JSONEachRow",
  timeout: 30_000
)

# Raising version
result = Huginn.query!("SELECT 1")

Inserts (ExecuteQuery)

# Simple insert with TabSeparated data
data = "john\t25\njane\t30"
{:ok, _} = Huginn.insert(
  "INSERT INTO users (name, age) FORMAT TabSeparated",
  data
)

# JSONEachRow format
data = ~s({"name":"john","age":25}\n{"name":"jane","age":30})
{:ok, _} = Huginn.insert("INSERT INTO users FORMAT JSONEachRow", data)

Streaming Inserts (ExecuteQueryWithStreamInput)

# Stream from a file
File.stream!("large_data.csv", [], 65_536)
|> Huginn.insert_stream("INSERT INTO logs FORMAT CSV")

# Stream with progress tracking using Agent
{:ok, counter} = Agent.start_link(fn -> 0 end)

large_data
|> Stream.chunk_every(1000)
|> Stream.map(fn chunk ->
  Agent.update(counter, &(&1 + length(chunk)))
  Enum.join(chunk, "\n")
end)
|> Huginn.insert_stream("INSERT INTO events FORMAT TabSeparated")

IO.puts("Inserted #{Agent.get(counter, & &1)} rows")
Agent.stop(counter)

Streaming Results (ExecuteQueryWithStreamOutput)

# Basic streaming
Huginn.stream_query("SELECT * FROM large_table")
|> Enum.each(fn
  {:ok, result} -> process_chunk(result)
  {:error, error} -> Logger.error("Error: #{inspect(error)}")
end)

# Stream rows directly
Huginn.stream_rows("SELECT * FROM events")
|> Stream.take(1000)
|> Enum.to_list()

# Stream as maps
Huginn.stream_maps("SELECT name, age FROM users")
|> Stream.filter(fn %{"age" => age} -> String.to_integer(age) > 18 end)
|> Enum.to_list()

Using Agent to Accumulate Results

# Accumulate all rows with error counting
{:ok, agent} = Agent.start_link(fn -> %{rows: [], errors: 0} end)

Huginn.stream_query("SELECT * FROM metrics")
|> Enum.each(fn
  {:ok, result} ->
    Agent.update(agent, fn state ->
      %{state | rows: state.rows ++ result.rows}
    end)
  {:error, _} ->
    Agent.update(agent, fn state ->
      %{state | errors: state.errors + 1}
    end)
end)

final_state = Agent.get(agent, & &1)
IO.puts("Got #{length(final_state.rows)} rows, #{final_state.errors} errors")
Agent.stop(agent)

Custom Stream.resource Pattern

defmodule MyApp.ClickHouseStream do
  @moduledoc "Custom streaming with backpressure control"

  def stream_with_backpressure(query, batch_size \\ 100) do
    Stream.resource(
      fn -> init_query(query) end,
      fn state -> next_batch(state, batch_size) end,
      fn _state -> :ok end
    )
  end

  defp init_query(query) do
    stream = Huginn.stream_rows(query)
    %{stream: stream, buffer: [], done: false}
  end

  defp next_batch(%{done: true} = state, _batch_size) do
    {:halt, state}
  end

  defp next_batch(%{stream: stream, buffer: buffer} = state, batch_size) do
    {rows, rest} =
      stream
      |> Stream.take(batch_size - length(buffer))
      |> Enum.to_list()
      |> then(fn new_rows -> {buffer ++ new_rows, stream} end)

    if length(rows) < batch_size do
      {[rows], %{state | done: true}}
    else
      {[Enum.take(rows, batch_size)], %{state | buffer: Enum.drop(rows, batch_size)}}
    end
  end
end

# Usage
MyApp.ClickHouseStream.stream_with_backpressure("SELECT * FROM events")
|> Enum.each(fn batch ->
  process_batch(batch)
  Process.sleep(100)  # Rate limiting
end)

Bidirectional Streaming (ExecuteQueryWithStreamIO)

# Interactive query session
{output, send} = Huginn.stream_io()

# Send queries
send.(Huginn.Clickhouse.Query.build("SELECT 1"))
send.(Huginn.Clickhouse.Query.build("SELECT 2"))

# Process results
Task.async(fn ->
  Enum.each(output, fn
    {:ok, result} -> IO.inspect(result.rows)
    {:error, err} -> IO.puts("Error: #{inspect(err)}")
  end)
end)

Query Cancellation

# Start a long-running query with custom ID
query_id = "my-long-query-#{System.unique_integer()}"

task = Task.async(fn ->
  Huginn.query("SELECT sleep(300)", query_id: query_id)
end)

# Cancel after some time
Process.sleep(5_000)
:ok = Huginn.cancel(query_id)

# Cancel queries by condition
Huginn.cancel_where("elapsed > 60")
Huginn.cancel_where("user = 'admin'")

# List running queries
{:ok, result} = Huginn.running_queries()
Huginn.Clickhouse.Result.to_maps(result)
|> Enum.each(fn q -> IO.puts("#{q["query_id"]}: #{q["query"]}") end)

Health Check

case Huginn.ping() do
  :ok -> IO.puts("Connected!")
  {:error, reason} -> IO.puts("Failed: #{inspect(reason)}")
end

Telemetry

Each request issued through query/2, insert/3, and insert_stream/3 is wrapped in a :telemetry span:

EventMeasurementsMetadata
[:huginn, :query, :start]:system_time, :monotonic_time:method, :sql, :query_id, :pool
[:huginn, :query, :stop]:duration, :monotonic_timeabove + :rows, :stats (or :error)
[:huginn, :query, :exception]:duration, :monotonic_timeabove + :kind, :reason, :stacktrace

Attach the built-in logger (off by default), or your own handler:

# Logs each completed query with its duration; failures log at :error.
Huginn.attach_default_logger(:info)

# ...or attach a custom handler
:telemetry.attach(
  "my-handler",
  [:huginn, :query, :stop],
  fn _event, %{duration: d}, meta, _ ->
    ms = System.convert_time_unit(d, :native, :millisecond)
    MyMetrics.histogram("clickhouse.query.duration", ms, tags: [meta.method])
  end,
  nil
)

See Huginn.Clickhouse.Telemetry for the full reference.

Retries

query/2 and insert/3 can retry transient transport failures (connection errors and gRPC UNAVAILABLE/DEADLINE_EXCEEDED) with exponential backoff. Retries are off by default, and ClickHouse query errors are never retried.

# Up to 3 extra attempts, backing off 100ms, 200ms, 400ms.
Huginn.query("SELECT 1", retries: 3, retry_backoff: 100)

See Huginn.Clickhouse.Retry for details.

Development

Prerequisites

  • Elixir 1.14+
  • Docker and Docker Compose
  • protoc compiler (brew install protobuf on macOS)

Start ClickHouse

docker-compose up -d

This starts ClickHouse with gRPC enabled on port 9100.

Run Tests

# Unit tests (no ClickHouse required)
mix test

# Include the end-to-end suite (requires ClickHouse via docker-compose up -d)
mix test --include integration

Generate Documentation

mix docs

Regenerate Proto Files

If you need to regenerate the proto files after updating the .proto file:

# Install protoc-gen-elixir
mix escript.install hex protobuf

# Generate Elixir code
protoc --elixir_out=plugins=grpc:./lib/huginn/proto \
  --proto_path=./priv/protos \
  clickhouse_grpc.proto

Architecture

lib/huginn/
 clickhouse/
    client.ex      # High-level API (all 4 gRPC methods)
    config.ex      # Configuration management
    query.ex       # QueryInfo message builders
    result.ex      # Result parsing utilities
    stream.ex      # Streaming helpers
 proto/
     clickhouse_grpc.pb.ex  # Generated protobuf code

License

MIT