Huginn.Clickhouse.Client (Huginn v0.4.0)
View SourceHigh-level ClickHouse gRPC client.
Provides a simple interface for executing queries against ClickHouse using gRPC with connection pooling. Supports all 4 ClickHouse gRPC methods:
gRPC Methods
| Method | Function | Use Case |
|---|---|---|
ExecuteQuery | query/2, insert/3 | Simple queries and small inserts |
ExecuteQueryWithStreamInput | insert_stream/3 | Large data inserts |
ExecuteQueryWithStreamOutput | stream_query/2 | Large result sets |
ExecuteQueryWithStreamIO | stream_io/1 | Bidirectional streaming |
Examples
Simple Query (ExecuteQuery)
{:ok, result} = Huginn.Clickhouse.Client.query("SELECT 1")Streaming Insert (ExecuteQueryWithStreamInput)
data_chunks = ["row1\tval1", "row2\tval2", "row3\tval3"]
{:ok, result} = Huginn.Clickhouse.Client.insert_stream(
"INSERT INTO t FORMAT TabSeparated",
data_chunks
)Streaming Results (ExecuteQueryWithStreamOutput)
Huginn.Clickhouse.Client.stream_query("SELECT * FROM large_table")
|> Enum.each(fn {:ok, result} -> process(result) end)Using Agent to Accumulate Results
{:ok, agent} = Agent.start_link(fn -> [] end)
Huginn.Clickhouse.Client.stream_query("SELECT * FROM events")
|> Enum.each(fn
{:ok, result} ->
Agent.update(agent, fn acc -> acc ++ result.rows end)
{:error, _} ->
:skip
end)
all_rows = Agent.get(agent, & &1)
Agent.stop(agent)Stream.resource Pattern
defmodule MyApp.ClickHouseStream do
def stream_events(query) do
Stream.resource(
fn -> start_query(query) end,
&next_chunk/1,
&cleanup/1
)
end
defp start_query(query) do
case Huginn.Clickhouse.Client.query(query) do
{:ok, result} -> {:ok, result.rows}
{:error, _} = err -> err
end
end
defp next_chunk({:error, _} = err), do: {:halt, err}
defp next_chunk({:ok, []}), do: {:halt, :done}
defp next_chunk({:ok, [row | rest]}), do: {[row], {:ok, rest}}
defp cleanup(_), do: :ok
end
Summary
Functions
Cancels a running query by its query ID.
Cancels all queries matching a pattern.
Inserts data using the ExecuteQuery gRPC method.
Inserts data using the ExecuteQueryWithStreamInput gRPC method.
Pings the ClickHouse server to check connectivity.
Executes a query using the ExecuteQuery gRPC method.
Executes a query and raises on error.
Lists currently running queries.
Opens a bidirectional streaming connection using ExecuteQueryWithStreamIO.
Streams rows as maps from a query result.
Executes a query using the ExecuteQueryWithStreamOutput gRPC method.
Streams individual rows from a query result.
Types
@type query_opts() :: [ pool: atom(), database: String.t(), format: String.t(), settings: map(), timeout: non_neg_integer(), query_id: String.t(), retries: non_neg_integer(), retry_backoff: non_neg_integer() ]
Functions
Cancels a running query by its query ID.
Uses ClickHouse's KILL QUERY command to stop execution.
Examples
# Start a long query with custom ID
query_id = "my-query-123"
Task.async(fn ->
Huginn.Clickhouse.Client.query(
"SELECT sleep(60)",
query_id: query_id
)
end)
# Cancel it
:ok = Huginn.Clickhouse.Client.cancel(query_id)
Cancels all queries matching a pattern.
Trusted input only
condition is interpolated verbatim into a KILL QUERY WHERE statement.
Never pass untrusted/user-supplied input — build the condition from trusted
values only. To cancel a single known query id safely, use cancel/2.
Examples
# Cancel all queries from a specific user
Huginn.Clickhouse.Client.cancel_where("user = 'admin'")
# Cancel queries running longer than 60 seconds
Huginn.Clickhouse.Client.cancel_where("elapsed > 60")
@spec insert(String.t(), binary(), keyword()) :: {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}
Inserts data using the ExecuteQuery gRPC method.
Suitable for small to medium inserts where data fits in a single request.
For large inserts, use insert_stream/3.
Options
:format- Input data format (default: "TabSeparated")- All options from
query/2
Examples
# TabSeparated format
data = "john\t25\njane\t30"
{:ok, _} = Huginn.Clickhouse.Client.insert(
"INSERT INTO users (name, age) FORMAT TabSeparated",
data
)
# JSONEachRow format
data = ~s({"name":"john","age":25}\n{"name":"jane","age":30})
{:ok, _} = Huginn.Clickhouse.Client.insert(
"INSERT INTO users FORMAT JSONEachRow",
data
)
@spec insert_stream(String.t(), Enumerable.t(), keyword()) :: {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}
Inserts data using the ExecuteQueryWithStreamInput gRPC method.
This method streams data to ClickHouse in chunks, suitable for:
- Large file imports
- Continuous data ingestion
- Memory-efficient bulk inserts
The data stream is sent as multiple QueryInfo messages where:
- First message contains the SQL query and initial data
- Subsequent messages contain only data chunks
Options
:format- Input data format (default: "TabSeparated"):chunk_size- Bytes per chunk- All options from
query/2
Examples
# Stream from a file
File.stream!("large_data.csv", [], 65_536)
|> Huginn.Clickhouse.Client.insert_stream("INSERT INTO logs FORMAT CSV")
# Stream from enumerable
data_chunks = ["row1\tval1\n", "row2\tval2\n", "row3\tval3\n"]
{:ok, _} = Huginn.Clickhouse.Client.insert_stream(
"INSERT INTO t FORMAT TabSeparated",
data_chunks
)
# With Agent to track progress
{:ok, counter} = Agent.start_link(fn -> 0 end)
large_data
|> Stream.chunk_every(1000)
|> Stream.map(fn chunk ->
Agent.update(counter, &(&1 + length(chunk)))
Enum.join(chunk, "\n")
end)
|> Huginn.Clickhouse.Client.insert_stream("INSERT INTO t FORMAT TabSeparated")
IO.puts("Inserted #{Agent.get(counter, & &1)} rows")
Pings the ClickHouse server to check connectivity.
Examples
case Huginn.Clickhouse.Client.ping() do
:ok -> IO.puts("Connected!")
{:error, reason} -> IO.puts("Failed: #{inspect(reason)}")
end
@spec query(String.t(), query_opts()) :: {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}
Executes a query using the ExecuteQuery gRPC method.
This is a simple request/response pattern suitable for:
- SELECT queries with reasonable result sizes
- DDL statements (CREATE, ALTER, DROP)
- Small INSERT statements
Options
:pool- Pool name (default: configured pool):database- Database to use:format- Output format (default: "TabSeparated"):settings- ClickHouse settings map:timeout- Query timeout in milliseconds (default: 60000):query_id- Custom query ID for tracking/cancellation
Examples
# Simple query
{:ok, result} = Huginn.Clickhouse.Client.query("SELECT 1")
# With options
{:ok, result} = Huginn.Clickhouse.Client.query(
"SELECT * FROM users",
database: "mydb",
format: "JSONEachRow",
timeout: 30_000
)
# Get results as maps
maps = Huginn.Clickhouse.Result.to_maps(result)
@spec query!(String.t(), query_opts()) :: Huginn.Clickhouse.Result.t()
Executes a query and raises on error.
See query/2 for options.
@spec running_queries(keyword()) :: {:ok, Huginn.Clickhouse.Result.t()} | {:error, term()}
Lists currently running queries.
Examples
{:ok, result} = Huginn.Clickhouse.Client.running_queries()
queries = Huginn.Clickhouse.Result.to_maps(result)
Enum.each(queries, fn q ->
IO.puts("#{q["query_id"]}: #{q["query"]}")
end)
@spec stream_io(query_opts()) :: {Enumerable.t(), (struct() -> :ok)} | {:error, term()}
Opens a bidirectional streaming connection using ExecuteQueryWithStreamIO.
This is the most flexible but complex method, suitable for:
- Interactive query sessions
- Real-time data processing pipelines
- Custom streaming protocols
Returns a tuple of {output_stream, send_function} where:
output_streamis an enumerable of resultssend_functionsends QueryInfo messages to the server
Examples
# Basic bidirectional streaming
{output, send} = Huginn.Clickhouse.Client.stream_io()
# Send a query
query_info = Huginn.Clickhouse.Query.build("SELECT 1")
send.(query_info)
# Read response
Enum.take(output, 1)
# Send cancellation
send.(Huginn.Clickhouse.Query.build_cancel())Agent-based Bidirectional Example
defmodule MyApp.StreamProcessor do
def process do
{output, send} = Huginn.Clickhouse.Client.stream_io()
# Start result collector
{:ok, results} = Agent.start_link(fn -> [] end)
# Spawn output reader
reader = Task.async(fn ->
Enum.each(output, fn
{:ok, result} ->
Agent.update(results, &[result | &1])
{:error, _} ->
:skip
end)
end)
# Send queries
send.(Huginn.Clickhouse.Query.build("SELECT 1"))
send.(Huginn.Clickhouse.Query.build("SELECT 2"))
# Wait and get results
Task.await(reader)
Agent.get(results, &Enum.reverse/1)
end
end
@spec stream_maps(String.t(), query_opts()) :: Enumerable.t(map())
Streams rows as maps from a query result.
Convenience wrapper that converts each row to a map with column names as keys.
Examples
Huginn.Clickhouse.Client.stream_maps("SELECT id, name, age FROM users")
|> Stream.filter(fn %{"age" => age} -> age > 18 end)
|> Enum.to_list()
@spec stream_query(String.t(), query_opts()) :: Enumerable.t()
Executes a query using the ExecuteQueryWithStreamOutput gRPC method.
Returns a lazy stream of results, suitable for:
- Large result sets that don't fit in memory
- Processing data as it arrives
- Implementing pagination-like behavior
Options
- All options from
query/2
Returns
Returns a Stream that yields {:ok, Result.t()} or {:error, term()} tuples.
Examples
# Basic streaming
Huginn.Clickhouse.Client.stream_query("SELECT * FROM large_table")
|> Enum.each(fn
{:ok, result} -> process_chunk(result)
{:error, error} -> Logger.error("Error: #{inspect(error)}")
end)
# Collect all results
results =
Huginn.Clickhouse.Client.stream_query("SELECT * FROM events")
|> Enum.reduce([], fn
{:ok, result}, acc -> acc ++ result.rows
{:error, _}, acc -> acc
end)
# Using Stream.resource pattern with early termination
Huginn.Clickhouse.Client.stream_query("SELECT * FROM logs")
|> Stream.flat_map(fn {:ok, r} -> r.rows; _ -> [] end)
|> Stream.take_while(fn [ts | _] -> ts > cutoff_time end)
|> Enum.to_list()
# Agent accumulator pattern
{:ok, agent} = Agent.start_link(fn -> %{rows: [], errors: 0} end)
Huginn.Clickhouse.Client.stream_query("SELECT * FROM metrics")
|> Enum.each(fn
{:ok, result} ->
Agent.update(agent, fn state ->
%{state | rows: state.rows ++ result.rows}
end)
{:error, _} ->
Agent.update(agent, fn state ->
%{state | errors: state.errors + 1}
end)
end)
final_state = Agent.get(agent, & &1)
@spec stream_rows(String.t(), query_opts()) :: Enumerable.t(list())
Streams individual rows from a query result.
Convenience wrapper around stream_query/2 that flattens results into rows.
Examples
Huginn.Clickhouse.Client.stream_rows("SELECT id, name FROM users")
|> Stream.take(100)
|> Enum.each(fn [id, name] -> IO.puts("#{id}: #{name}") end)