ExeQute (exe_qute v0.1.2)
Copy MarkdownElixir client for KDB+, the high-performance time-series database.
Handles the KDB+ IPC wire protocol, full type system encoding/decoding, and
tickerplant pub/sub. All functions return {:ok, result} or {:error, reason} —
no exceptions escape to the caller.
Querying
One-shot
{:ok, result} = ExeQute.query("select from trade", host: "kdb-host", port: 5010)Persistent connection
{:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010)
{:ok, result} = ExeQute.query(conn, "select from trade")
ExeQute.disconnect(conn)Named connection
ExeQute.connect(host: "kdb-host", port: 5010, name: :trades)
{:ok, result} = ExeQute.query(:trades, "select from trade")Parameterized queries
Arguments are encoded as KDB+ types on the wire — no string interpolation needed.
{:ok, result} = ExeQute.query(conn, "{x + y}", [1, 2])
{:ok, result} = ExeQute.query(conn, ".myns.getquotes", ["USD/JPY", ~D[2024-01-01]])Publishing (fire-and-forget)
:ok = ExeQute.publish(conn, ".feed.upd", ["trade", rows])Pub/Sub
Subscribe to KDB+ tickerplant push messages. One TCP connection to the tickerplant serves any number of local subscribers.
Process-based — receive messages in handle_info
defmodule MyApp.TradeHandler do
use GenServer
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def subscribe do
ExeQute.subscribe("trade", host: "tp-host", port: 5010)
ExeQute.subscribe("quote", host: "tp-host", port: 5010)
end
def unsubscribe do
ExeQute.unsubscribe("trade", host: "tp-host", port: 5010)
ExeQute.unsubscribe("quote", host: "tp-host", port: 5010)
end
@impl true
def init(_opts), do: {:ok, %{}}
@impl true
def handle_info({:exe_qute, table, data}, state) do
IO.inspect({table, data})
{:noreply, state}
end
end
{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()Named subscriber
ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)
defmodule MyApp.TradeHandler do
use GenServer
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def subscribe do
ExeQute.subscribe(:tp, "trade")
ExeQute.subscribe(:tp, "quote")
end
def unsubscribe do
ExeQute.unsubscribe(:tp, "trade")
ExeQute.unsubscribe(:tp, "quote")
end
@impl true
def init(_opts), do: {:ok, %{}}
@impl true
def handle_info({:exe_qute, table, data}, state) do
IO.inspect({table, data})
{:noreply, state}
end
end
{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()Callback-based
ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)
{:ok, trade_ref} = ExeQute.subscribe(:tp, "trade", fn {table, data} ->
IO.inspect({table, data})
end)
{:ok, quote_ref} = ExeQute.subscribe(:tp, "quote", ["AAPL", "MSFT"], fn {_table, data} ->
IO.inspect(data)
end)
Process.sleep(30_000)
ExeQute.unsubscribe(:tp, trade_ref)
ExeQute.unsubscribe(:tp, quote_ref)See ExeQute.Subscriber for starting a subscriber in a supervision tree.
Summary
Functions
Connects to a KDB+ instance.
Returns whether a connection process is alive.
Closes a connection opened with connect/1.
Renders a query result as a tabbed Kino widget.
Lists functions in a namespace with their parameter names and source bodies.
Lists all namespaces defined on a KDB+ instance.
Sends a fire-and-forget message to a KDB+ function without waiting for a response.
Executes a query against a KDB+ instance.
Clears the introspection cache on a connection.
Subscribes to push messages from a KDB+ tickerplant.
Lists table names in a namespace.
Converts raw KDB+ push data to a list of row maps.
Removes a subscription.
Lists variable names defined in a namespace.
Types
@type connect_opts() :: [ host: String.t(), port: pos_integer(), username: String.t() | nil, password: String.t() | nil, timeout: pos_integer(), encoding: String.t(), name: atom() | nil ]
@type table_format() :: :maps | :columnar
Functions
@spec connect(connect_opts()) :: {:ok, pid()} | {:error, term()}
Connects to a KDB+ instance.
Options
:host- hostname or IP address of the KDB+ server (default:"localhost"):port- TCP port the KDB+ server is listening on (default:5001):username- username for authentication (optional; omit for unauthenticated servers):password- password for authentication (required if:usernameis set):timeout- connection and query timeout in milliseconds (default:5000):encoding- character encoding for string data (default:"utf8"):name- register the connection under a local atom name so it can be referenced without holding the pid (optional)
Examples
{:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010)
{:ok, _} = ExeQute.connect(host: "kdb-host", port: 5010, name: :trades)
ExeQute.query(:trades, "select from trade")
{:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010, username: "user", password: "pass")
Returns whether a connection process is alive.
Note: a true result means the connection GenServer is running — it does not
guarantee the underlying TCP socket is still healthy.
Examples
ExeQute.connected?(conn) #=> true
ExeQute.connected?(:trades) #=> false
Closes a connection opened with connect/1.
Safe to call on already-dead connections — always returns :ok.
Examples
ExeQute.disconnect(conn)
ExeQute.disconnect(:trades)
@spec display(term(), String.t()) :: Kino.Layout.t() | {:error, :kino_not_available}
Renders a query result as a tabbed Kino widget.
Tabs shown depend on the shape of the result:
- Table — shown when result is a non-empty list of maps; uses
Kino.DataTable - Tree — shown when result is a non-empty list or map; uses
Kino.Tree - Raw — always shown; displays
inspect/1output as a code block
Examples
{:ok, result} = ExeQute.query(conn, "select from trade")
ExeQute.display(result, "select from trade")
Lists functions in a namespace with their parameter names and source bodies.
Pass a namespace string such as ".myns" to scope the listing, or omit the
argument (or pass nil) to list functions in the root namespace.
Each entry is a map with three string keys:
| Key | Description |
|---|---|
"name" | Fully qualified function name, e.g. ".myns.getquotes" |
"params" | List of parameter name strings; [] for zero-arity functions |
"body" | Verbatim q source of the function body |
This is useful for building lightweight documentation around a live KDB+ instance, or for powering autocomplete and signature-help tooling without leaving Elixir.
Results are cached per-connection. Call refresh_introspection/1 after
deploying new functions to pick up changes.
See also namespaces/1, tables/2, variables/2.
Examples
{:ok, fns} = ExeQute.functions(conn)
{:ok, fns} = ExeQute.functions(conn, ".util")
#=> {:ok, [
#=> %{
#=> "name" => ".util.getquotes",
#=> "params" => ["sym", "start", "end"],
#=> "body" => "{[sym;start;end] select from quote where sym=sym, date within (start;end)}"
#=> },
#=> %{
#=> "name" => ".util.lasttrade",
#=> "params" => ["sym"],
#=> "body" => "{[sym] last select from trade where sym=sym}"
#=> },
#=> %{
#=> "name" => ".util.init",
#=> "params" => [],
#=> "body" => "{[] ...}"
#=> }
#=> ]}Calling a discovered function with its parameter list:
{:ok, [fn_info | _]} = ExeQute.functions(conn, ".util")
name = fn_info["name"] #=> ".util.getquotes"
params = fn_info["params"] #=> ["sym", "start", "end"]
{:ok, result} = ExeQute.query(conn, name, ["EUR/USD", ~D[2024-01-01], ~D[2024-12-31]])
Lists all namespaces defined on a KDB+ instance.
Returns names prefixed with . — for example ".myns", ".q", ".Q", ".h".
The root namespace itself is not included; use tables/2, functions/2, and
variables/2 without a namespace argument to inspect the root.
Results are cached per-connection after the first call. Call
refresh_introspection/1 to force a fresh fetch after deploying new code.
See also functions/2, tables/2, variables/2,
and ExeQute.Introspect for the full introspection API.
Examples
{:ok, namespaces} = ExeQute.namespaces(conn)
#=> {:ok, [".myns", ".feed", ".util", ".q", ".Q", ".h"]}
{:ok, namespaces} = ExeQute.namespaces(:rdb)
Sends a fire-and-forget message to a KDB+ function without waiting for a response.
Uses the KDB+ async message type — the server receives and processes the call but sends no reply. Useful for publishing data into KDB+ feeds or triggering side-effects.
Examples
ExeQute.publish(conn, ".feed.upd", ["trade", table_data])
ExeQute.publish(:feed, ".u.pub", ["quote", rows])
@spec query(pid() | atom(), String.t()) :: {:ok, term()} | {:error, term()}
@spec query(String.t(), connect_opts()) :: {:ok, term()} | {:error, term()}
Executes a query against a KDB+ instance.
Accepts either an existing connection (pid or registered name) or connection options for a one-shot query.
One-shot query
Pass the q expression as the first argument and connection options as the second.
Opens a connection, runs the query, then closes it. Accepts the same options as
connect/1, plus:
:table_format—:maps(default) or:columnar.:trap— whentrue, wraps the query in a server-side error trap so q errors come back as{:error, message}rather than as backtrace responses. SeeExeQute.QErrorfor the kdb+ gateway shapes this handles and forQError.trap/1/QError.untrap/1helpers usable on persistent connections.= ExeQute.query("select from trade", host: "kdb-host", port: 5010) {:ok, result} = ExeQute.query("select from trade", host: "kdb-host", port: 5010, table_format: :columnar) {:error, "type"} = ExeQute.query("1+`a", host: "kdb-host", port: 5010, trap: true) ## Persistent connection Pass an already-open connection pid or registered name as the first argument. {:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010) {:ok, result} = ExeQute.query(conn, "select from trade") ExeQute.connect(host: "kdb-host", port: 5010, name: :trades) {:ok, result} = ExeQute.query(:trades, "select from trade") ## Parameterized queries Pass a function name and argument list. Arguments are serialised to q literal syntax and sent as a single q expression string — the same form GUI tools like QStudio use. {:ok, result} = ExeQute.query(conn, "{x + y}", [1, 2]) {:ok, result} = ExeQute.query(conn, ".myns.getquotes", ["USD/JPY", ~D[2024-01-01]]) {:ok, result} = ExeQute.query(conn, ".myns.fn", [])
Clears the introspection cache on a connection.
Forces the next call to namespaces/1, functions/2, variables/2, or
tables/2 to re-query KDB+. Useful after deploying new functions or tables.
Examples
ExeQute.refresh_introspection(conn)
@spec subscribe( String.t(), keyword() ) :: :ok | {:error, term()}
@spec subscribe(pid() | atom(), String.t()) :: :ok | {:error, term()}
Subscribes to push messages from a KDB+ tickerplant.
Auto-connecting (no prior subscriber needed)
Pass connection opts as the second argument. The subscriber connection is created
automatically on first use and reused for subsequent subscriptions to the same
host/port. Multiple processes can subscribe to the same table — only one TCP
connection is opened.
If you already have a ExeQute.connect/1 connection open, pass connection: to
reuse its host and port rather than supplying them again:
ExeQute.subscribe("bbo", connection: :rdb)
ExeQute.subscribe("trade", fn {_t, data} -> ... end, connection: :rdb)
ExeQute.unsubscribe("bbo", connection: :rdb)Process-based (messages delivered via handle_info):
defmodule MyApp.TradeHandler do
use GenServer
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def subscribe do
ExeQute.subscribe("trade", host: "tp-host", port: 5010)
ExeQute.subscribe("quote", host: "tp-host", port: 5010)
end
def unsubscribe do
ExeQute.unsubscribe("trade", host: "tp-host", port: 5010)
ExeQute.unsubscribe("quote", host: "tp-host", port: 5010)
end
@impl true
def init(_opts), do: {:ok, %{}}
@impl true
def handle_info({:exe_qute, table, data}, state) do
IO.inspect({table, data})
{:noreply, state}
end
end
{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()Existing subscriber
Pass an already-running subscriber pid or registered name as the first argument.
Process-based:
ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)
defmodule MyApp.TradeHandler do
use GenServer
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def subscribe do
ExeQute.subscribe(:tp, "trade")
ExeQute.subscribe(:tp, "quote")
end
def unsubscribe do
ExeQute.unsubscribe(:tp, "trade")
ExeQute.unsubscribe(:tp, "quote")
end
@impl true
def init(_opts), do: {:ok, %{}}
@impl true
def handle_info({:exe_qute, table, data}, state) do
IO.inspect({table, data})
{:noreply, state}
end
end
{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()Callback-based:
ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)
{:ok, trade_ref} = ExeQute.subscribe(:tp, "trade", fn {table, data} ->
IO.inspect({table, data})
end)
{:ok, quote_ref} = ExeQute.subscribe(:tp, "quote", ["AAPL", "MSFT"], fn {_table, data} ->
IO.inspect(data)
end)
Process.sleep(30_000)
ExeQute.unsubscribe(:tp, trade_ref)
ExeQute.unsubscribe(:tp, quote_ref)
Lists table names in a namespace.
Pass a namespace string such as ".myns" or omit the argument (or pass nil)
to list tables in the root namespace. Returns simple names without the
namespace prefix.
Results are cached per-connection. Call refresh_introspection/1 to force
a fresh fetch after schema changes.
See also namespaces/1, functions/2, variables/2.
Examples
{:ok, tables} = ExeQute.tables(conn)
#=> {:ok, ["trade", "quote", "bbo"]}
{:ok, tables} = ExeQute.tables(conn, ".myns")
#=> {:ok, ["positions", "orders"]}
{:ok, tables} = ExeQute.tables(:rdb)Querying all rows from a discovered table:
{:ok, [table | _]} = ExeQute.tables(conn)
{:ok, rows} = ExeQute.query(conn, "select from #{table}")
Converts raw KDB+ push data to a list of row maps.
Tickerplant callbacks receive raw decoded data in columnar format. Pass the value through this function before iterating over rows.
Examples
ExeQute.subscribe(tp, "trade", fn {_table, raw} ->
rows = ExeQute.to_rows(raw)
Enum.each(rows, &IO.inspect/1)
end)
@spec unsubscribe(pid() | atom(), String.t() | reference()) :: :ok
@spec unsubscribe( String.t(), keyword() ) :: :ok
Removes a subscription.
For process-based subscriptions, pass the table name — unregisters the calling
process and notifies the subscriber. If no other process or callback is subscribed
to that table, .u.unsub is sent to the tickerplant.
For callback subscriptions, pass the reference returned by subscribe/3 or /4.
Examples
ExeQute.unsubscribe(:tp, "trade")
ExeQute.unsubscribe(:tp, ref)
ExeQute.unsubscribe("trade", host: "tp", port: 5010)
Lists variable names defined in a namespace.
Pass a namespace string such as ".myns" or omit the argument (or pass nil)
to list variables in the root namespace. Returns simple names without the
namespace prefix.
Variables are q global values that are not functions and not tables. Use
tables/2 to list tables and functions/2 to list functions.
Results are cached per-connection. Call refresh_introspection/1 to force
a fresh fetch.
See also namespaces/1, tables/2, functions/2.
Examples
{:ok, vars} = ExeQute.variables(conn)
#=> {:ok, ["version", "startTime", "bidSize", "askSize"]}
{:ok, vars} = ExeQute.variables(conn, ".myns")
#=> {:ok, ["config", "state"]}