ExeQute (exe_qute v0.1.1)

Copy Markdown

Elixir client for KDB+, the high-performance time-series database.

Handles the KDB+ IPC wire protocol, full type system encoding/decoding, and tickerplant pub/sub. All functions return {:ok, result} or {:error, reason} — no exceptions escape to the caller.

Querying

One-shot

{:ok, result} = ExeQute.query("select from trade", host: "kdb-host", port: 5010)

Persistent connection

{:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010)
{:ok, result} = ExeQute.query(conn, "select from trade")
ExeQute.disconnect(conn)

Named connection

ExeQute.connect(host: "kdb-host", port: 5010, name: :trades)
{:ok, result} = ExeQute.query(:trades, "select from trade")

Parameterized queries

Arguments are encoded as KDB+ types on the wire — no string interpolation needed.

{:ok, result} = ExeQute.query(conn, "{x + y}", [1, 2])
{:ok, result} = ExeQute.query(conn, ".myns.getquotes", ["USD/JPY", ~D[2024-01-01]])

Publishing (fire-and-forget)

:ok = ExeQute.publish(conn, ".feed.upd", ["trade", rows])

Pub/Sub

Subscribe to KDB+ tickerplant push messages. One TCP connection to the tickerplant serves any number of local subscribers.

Process-based — receive messages in handle_info

defmodule MyApp.TradeHandler do
  use GenServer

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  def subscribe do
    ExeQute.subscribe("trade", host: "tp-host", port: 5010)
    ExeQute.subscribe("quote", host: "tp-host", port: 5010)
  end

  def unsubscribe do
    ExeQute.unsubscribe("trade", host: "tp-host", port: 5010)
    ExeQute.unsubscribe("quote", host: "tp-host", port: 5010)
  end

  @impl true
  def init(_opts), do: {:ok, %{}}

  @impl true
  def handle_info({:exe_qute, table, data}, state) do
    IO.inspect({table, data})
    {:noreply, state}
  end
end

{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()

Named subscriber

ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)

defmodule MyApp.TradeHandler do
  use GenServer

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  def subscribe do
    ExeQute.subscribe(:tp, "trade")
    ExeQute.subscribe(:tp, "quote")
  end

  def unsubscribe do
    ExeQute.unsubscribe(:tp, "trade")
    ExeQute.unsubscribe(:tp, "quote")
  end

  @impl true
  def init(_opts), do: {:ok, %{}}

  @impl true
  def handle_info({:exe_qute, table, data}, state) do
    IO.inspect({table, data})
    {:noreply, state}
  end
end

{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()

Callback-based

ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)

{:ok, trade_ref} = ExeQute.subscribe(:tp, "trade", fn {table, data} ->
  IO.inspect({table, data})
end)

{:ok, quote_ref} = ExeQute.subscribe(:tp, "quote", ["AAPL", "MSFT"], fn {_table, data} ->
  IO.inspect(data)
end)

Process.sleep(30_000)

ExeQute.unsubscribe(:tp, trade_ref)
ExeQute.unsubscribe(:tp, quote_ref)

See ExeQute.Subscriber for starting a subscriber in a supervision tree.

Summary

Functions

Connects to a KDB+ instance.

Returns whether a connection process is alive.

Closes a connection opened with connect/1.

Renders a query result as a tabbed Kino widget.

Lists functions in a namespace with their parameter names and source bodies.

Lists all namespaces defined on a KDB+ instance.

Sends a fire-and-forget message to a KDB+ function without waiting for a response.

Executes a query against a KDB+ instance.

Clears the introspection cache on a connection.

Subscribes to push messages from a KDB+ tickerplant.

Lists table names in a namespace.

Converts raw KDB+ push data to a list of row maps.

Removes a subscription.

Lists variable names defined in a namespace.

Types

connect_opts()

@type connect_opts() :: [
  host: String.t(),
  port: pos_integer(),
  username: String.t() | nil,
  password: String.t() | nil,
  timeout: pos_integer(),
  encoding: String.t(),
  name: atom() | nil
]

table_format()

@type table_format() :: :maps | :columnar

Functions

connect(opts \\ [])

@spec connect(connect_opts()) :: {:ok, pid()} | {:error, term()}

Connects to a KDB+ instance.

Options

  • :host - hostname or IP address of the KDB+ server (default: "localhost")
  • :port - TCP port the KDB+ server is listening on (default: 5001)
  • :username - username for authentication (optional; omit for unauthenticated servers)
  • :password - password for authentication (required if :username is set)
  • :timeout - connection and query timeout in milliseconds (default: 5000)
  • :encoding - character encoding for string data (default: "utf8")
  • :name - register the connection under a local atom name so it can be referenced without holding the pid (optional)

Examples

{:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010)

{:ok, _} = ExeQute.connect(host: "kdb-host", port: 5010, name: :trades)
ExeQute.query(:trades, "select from trade")

{:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010, username: "user", password: "pass")

connected?(conn)

@spec connected?(pid() | atom()) :: boolean()

Returns whether a connection process is alive.

Note: a true result means the connection GenServer is running — it does not guarantee the underlying TCP socket is still healthy.

Examples

ExeQute.connected?(conn)   #=> true
ExeQute.connected?(:trades) #=> false

disconnect(conn)

@spec disconnect(pid() | atom()) :: :ok

Closes a connection opened with connect/1.

Safe to call on already-dead connections — always returns :ok.

Examples

ExeQute.disconnect(conn)
ExeQute.disconnect(:trades)

display(result, label \\ "")

@spec display(term(), String.t()) :: Kino.Layout.t() | {:error, :kino_not_available}

Renders a query result as a tabbed Kino widget.

Tabs shown depend on the shape of the result:

  • Table — shown when result is a non-empty list of maps; uses Kino.DataTable
  • Tree — shown when result is a non-empty list or map; uses Kino.Tree
  • Raw — always shown; displays inspect/1 output as a code block

Examples

{:ok, result} = ExeQute.query(conn, "select from trade")
ExeQute.display(result, "select from trade")

functions(conn, namespace \\ nil)

@spec functions(pid() | atom(), String.t() | nil) :: {:ok, [map()]} | {:error, term()}

Lists functions in a namespace with their parameter names and source bodies.

Pass a namespace string such as ".myns" to scope the listing, or omit the argument (or pass nil) to list functions in the root namespace.

Each entry is a map with three string keys:

KeyDescription
"name"Fully qualified function name, e.g. ".myns.getquotes"
"params"List of parameter name strings; [] for zero-arity functions
"body"Verbatim q source of the function body

This is useful for building lightweight documentation around a live KDB+ instance, or for powering autocomplete and signature-help tooling without leaving Elixir.

Results are cached per-connection. Call refresh_introspection/1 after deploying new functions to pick up changes.

See also namespaces/1, tables/2, variables/2.

Examples

{:ok, fns} = ExeQute.functions(conn)

{:ok, fns} = ExeQute.functions(conn, ".util")
#=> {:ok, [
#=>   %{
#=>     "name"   => ".util.getquotes",
#=>     "params" => ["sym", "start", "end"],
#=>     "body"   => "{[sym;start;end] select from quote where sym=sym, date within (start;end)}"
#=>   },
#=>   %{
#=>     "name"   => ".util.lasttrade",
#=>     "params" => ["sym"],
#=>     "body"   => "{[sym] last select from trade where sym=sym}"
#=>   },
#=>   %{
#=>     "name"   => ".util.init",
#=>     "params" => [],
#=>     "body"   => "{[] ...}"
#=>   }
#=> ]}

Calling a discovered function with its parameter list:

{:ok, [fn_info | _]} = ExeQute.functions(conn, ".util")
name   = fn_info["name"]    #=> ".util.getquotes"
params = fn_info["params"]  #=> ["sym", "start", "end"]

{:ok, result} = ExeQute.query(conn, name, ["EUR/USD", ~D[2024-01-01], ~D[2024-12-31]])

namespaces(conn)

@spec namespaces(pid() | atom()) :: {:ok, [String.t()]} | {:error, term()}

Lists all namespaces defined on a KDB+ instance.

Returns names prefixed with . — for example ".myns", ".q", ".Q", ".h". The root namespace itself is not included; use tables/2, functions/2, and variables/2 without a namespace argument to inspect the root.

Results are cached per-connection after the first call. Call refresh_introspection/1 to force a fresh fetch after deploying new code.

See also functions/2, tables/2, variables/2, and ExeQute.Introspect for the full introspection API.

Examples

{:ok, namespaces} = ExeQute.namespaces(conn)
#=> {:ok, [".myns", ".feed", ".util", ".q", ".Q", ".h"]}

{:ok, namespaces} = ExeQute.namespaces(:rdb)

publish(conn, func, args)

@spec publish(pid() | atom(), String.t(), list()) :: :ok | {:error, term()}

Sends a fire-and-forget message to a KDB+ function without waiting for a response.

Uses the KDB+ async message type — the server receives and processes the call but sends no reply. Useful for publishing data into KDB+ feeds or triggering side-effects.

Examples

ExeQute.publish(conn, ".feed.upd", ["trade", table_data])
ExeQute.publish(:feed, ".u.pub", ["quote", rows])

query(conn, query)

@spec query(pid() | atom(), String.t()) :: {:ok, term()} | {:error, term()}
@spec query(String.t(), connect_opts()) :: {:ok, term()} | {:error, term()}

Executes a query against a KDB+ instance.

Accepts either an existing connection (pid or registered name) or connection options for a one-shot query.

One-shot query

Pass the q expression as the first argument and connection options as the second. Opens a connection, runs the query, then closes it. Accepts the same options as connect/1, plus :table_format (:maps or :columnar, default :maps).

{:ok, result} = ExeQute.query("select from trade", host: "kdb-host", port: 5010)
{:ok, result} = ExeQute.query("select from trade", host: "kdb-host", port: 5010, table_format: :columnar)

Persistent connection

Pass an already-open connection pid or registered name as the first argument.

{:ok, conn} = ExeQute.connect(host: "kdb-host", port: 5010)
{:ok, result} = ExeQute.query(conn, "select from trade")

ExeQute.connect(host: "kdb-host", port: 5010, name: :trades)
{:ok, result} = ExeQute.query(:trades, "select from trade")

Parameterized queries

Pass a function name and argument list. Arguments are serialised to q literal syntax and sent as a single q expression string — the same form GUI tools like QStudio use.

{:ok, result} = ExeQute.query(conn, "{x + y}", [1, 2])
{:ok, result} = ExeQute.query(conn, ".myns.getquotes", ["USD/JPY", ~D[2024-01-01]])
{:ok, result} = ExeQute.query(conn, ".myns.fn", [])

query(conn, func, args)

@spec query(pid() | atom(), String.t(), list()) :: {:ok, term()} | {:error, term()}

refresh_introspection(conn)

@spec refresh_introspection(pid() | atom()) :: :ok

Clears the introspection cache on a connection.

Forces the next call to namespaces/1, functions/2, variables/2, or tables/2 to re-query KDB+. Useful after deploying new functions or tables.

Examples

ExeQute.refresh_introspection(conn)

subscribe(table, opts)

@spec subscribe(
  String.t(),
  keyword()
) :: :ok | {:error, term()}
@spec subscribe(pid() | atom(), String.t()) :: :ok | {:error, term()}

Subscribes to push messages from a KDB+ tickerplant.

Auto-connecting (no prior subscriber needed)

Pass connection opts as the second argument. The subscriber connection is created automatically on first use and reused for subsequent subscriptions to the same host/port. Multiple processes can subscribe to the same table — only one TCP connection is opened.

If you already have a ExeQute.connect/1 connection open, pass connection: to reuse its host and port rather than supplying them again:

ExeQute.subscribe("bbo", connection: :rdb)
ExeQute.subscribe("trade", fn {_t, data} -> ... end, connection: :rdb)
ExeQute.unsubscribe("bbo", connection: :rdb)

Process-based (messages delivered via handle_info):

defmodule MyApp.TradeHandler do
  use GenServer

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  def subscribe do
    ExeQute.subscribe("trade", host: "tp-host", port: 5010)
    ExeQute.subscribe("quote", host: "tp-host", port: 5010)
  end

  def unsubscribe do
    ExeQute.unsubscribe("trade", host: "tp-host", port: 5010)
    ExeQute.unsubscribe("quote", host: "tp-host", port: 5010)
  end

  @impl true
  def init(_opts), do: {:ok, %{}}

  @impl true
  def handle_info({:exe_qute, table, data}, state) do
    IO.inspect({table, data})
    {:noreply, state}
  end
end

{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()

Existing subscriber

Pass an already-running subscriber pid or registered name as the first argument.

Process-based:

ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)

defmodule MyApp.TradeHandler do
  use GenServer

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  def subscribe do
    ExeQute.subscribe(:tp, "trade")
    ExeQute.subscribe(:tp, "quote")
  end

  def unsubscribe do
    ExeQute.unsubscribe(:tp, "trade")
    ExeQute.unsubscribe(:tp, "quote")
  end

  @impl true
  def init(_opts), do: {:ok, %{}}

  @impl true
  def handle_info({:exe_qute, table, data}, state) do
    IO.inspect({table, data})
    {:noreply, state}
  end
end

{:ok, _pid} = MyApp.TradeHandler.start_link([])
MyApp.TradeHandler.subscribe()
Process.sleep(30_000)
MyApp.TradeHandler.unsubscribe()

Callback-based:

ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)

{:ok, trade_ref} = ExeQute.subscribe(:tp, "trade", fn {table, data} ->
  IO.inspect({table, data})
end)

{:ok, quote_ref} = ExeQute.subscribe(:tp, "quote", ["AAPL", "MSFT"], fn {_table, data} ->
  IO.inspect(data)
end)

Process.sleep(30_000)

ExeQute.unsubscribe(:tp, trade_ref)
ExeQute.unsubscribe(:tp, quote_ref)

subscribe(table, callback, opts)

@spec subscribe(String.t(), (tuple() -> any()), keyword()) ::
  {:ok, reference()} | {:error, term()}
@spec subscribe(pid() | atom(), String.t(), (tuple() -> any())) ::
  {:ok, reference()} | {:error, term()}

subscribe(sub, table, syms, callback)

@spec subscribe(pid() | atom(), String.t(), [String.t()], (tuple() -> any())) ::
  {:ok, reference()} | {:error, term()}

tables(conn, namespace \\ nil)

@spec tables(pid() | atom(), String.t() | nil) ::
  {:ok, [String.t()]} | {:error, term()}

Lists table names in a namespace.

Pass a namespace string such as ".myns" or omit the argument (or pass nil) to list tables in the root namespace. Returns simple names without the namespace prefix.

Results are cached per-connection. Call refresh_introspection/1 to force a fresh fetch after schema changes.

See also namespaces/1, functions/2, variables/2.

Examples

{:ok, tables} = ExeQute.tables(conn)
#=> {:ok, ["trade", "quote", "bbo"]}

{:ok, tables} = ExeQute.tables(conn, ".myns")
#=> {:ok, ["positions", "orders"]}

{:ok, tables} = ExeQute.tables(:rdb)

Querying all rows from a discovered table:

{:ok, [table | _]} = ExeQute.tables(conn)
{:ok, rows} = ExeQute.query(conn, "select from #{table}")

to_rows(raw)

@spec to_rows(term()) :: [map()]

Converts raw KDB+ push data to a list of row maps.

Tickerplant callbacks receive raw decoded data in columnar format. Pass the value through this function before iterating over rows.

Examples

ExeQute.subscribe(tp, "trade", fn {_table, raw} ->
  rows = ExeQute.to_rows(raw)
  Enum.each(rows, &IO.inspect/1)
end)

unsubscribe(sub, table)

@spec unsubscribe(pid() | atom(), String.t() | reference()) :: :ok
@spec unsubscribe(
  String.t(),
  keyword()
) :: :ok

Removes a subscription.

For process-based subscriptions, pass the table name — unregisters the calling process and notifies the subscriber. If no other process or callback is subscribed to that table, .u.unsub is sent to the tickerplant.

For callback subscriptions, pass the reference returned by subscribe/3 or /4.

Examples

ExeQute.unsubscribe(:tp, "trade")
ExeQute.unsubscribe(:tp, ref)

ExeQute.unsubscribe("trade", host: "tp", port: 5010)

variables(conn, namespace \\ nil)

@spec variables(pid() | atom(), String.t() | nil) ::
  {:ok, [String.t()]} | {:error, term()}

Lists variable names defined in a namespace.

Pass a namespace string such as ".myns" or omit the argument (or pass nil) to list variables in the root namespace. Returns simple names without the namespace prefix.

Variables are q global values that are not functions and not tables. Use tables/2 to list tables and functions/2 to list functions.

Results are cached per-connection. Call refresh_introspection/1 to force a fresh fetch.

See also namespaces/1, tables/2, functions/2.

Examples

{:ok, vars} = ExeQute.variables(conn)
#=> {:ok, ["version", "startTime", "bidSize", "askSize"]}

{:ok, vars} = ExeQute.variables(conn, ".myns")
#=> {:ok, ["config", "state"]}