ExeQute.Subscriber
(exe_qute v0.1.1)
Copy Markdown
A long-lived KDB+ tickerplant connection that fans push messages out to multiple local subscribers over a single TCP connection.
In most cases you do not need to interact with this module directly —
ExeQute.subscribe/2 and ExeQute.unsubscribe/2 create and manage
subscriber processes for you.
Use this module directly when you want explicit lifecycle control, such as starting the subscriber in a supervision tree:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{ExeQute.Subscriber, host: "tp-host", port: 5010, name: :tp},
MyApp.TradeHandler
]
Supervisor.start_link(children, strategy: :one_for_one)
end
endOnce started, attach subscriptions via ExeQute.subscribe/2:
ExeQute.subscribe(:tp, "trade")
ExeQute.subscribe(:tp, "quote")Or use this module's own add/3 and add/4 for callback-based subscriptions:
{:ok, ref} = ExeQute.Subscriber.add(:tp, "trade", fn {table, data} ->
IO.inspect({table, data})
end)
ExeQute.Subscriber.remove(:tp, ref)
Summary
Functions
Adds a callback subscription to table on the given subscriber.
Adds a callback subscription to table filtered to the given symbols.
Returns a specification to start this module under a supervisor.
Callback implementation for GenServer.init/1.
Removes a callback subscription identified by ref.
Starts a subscriber process connected to the KDB+ tickerplant at host/port.
Finds an existing subscriber for host/port or starts a new one.
Stops the subscriber and closes its tickerplant connection.
Functions
Adds a callback subscription to table on the given subscriber.
The callback receives {table, data} on each update from the tickerplant.
Returns {:ok, ref} — pass ref to remove/2 to unsubscribe.
Examples
{:ok, ref} = ExeQute.Subscriber.add(:tp, "trade", fn {table, data} ->
IO.inspect({table, data})
end)
@spec add(pid() | atom(), String.t(), [String.t()], (tuple() -> any())) :: {:ok, reference()} | {:error, term()}
Adds a callback subscription to table filtered to the given symbols.
Only updates for symbols in syms are delivered to the callback.
Pass an empty list to receive all symbols.
Returns {:ok, ref} — pass ref to remove/2 to unsubscribe.
Examples
{:ok, ref} = ExeQute.Subscriber.add(:tp, "quote", ["AAPL", "MSFT"], fn {table, data} ->
IO.inspect(data)
end)
Returns a specification to start this module under a supervisor.
See Supervisor.
Callback implementation for GenServer.init/1.
Removes a callback subscription identified by ref.
ref is the value returned by add/3 or add/4. Always returns :ok,
even if the subscriber has already stopped.
Examples
{:ok, ref} = ExeQute.Subscriber.add(:tp, "trade", fn {_t, data} -> ... end)
ExeQute.Subscriber.remove(:tp, ref)
@spec start_link(keyword()) :: GenServer.on_start()
Starts a subscriber process connected to the KDB+ tickerplant at host/port.
Accepts the same connection options as ExeQute.connect/1. Pass name: to
register the process under an atom for use throughout your application.
Options
| Option | Default | Description |
|---|---|---|
:host | "localhost" | Tickerplant hostname or IP |
:port | 5001 | Tickerplant port |
:username | nil | Username |
:password | nil | Password |
:name | nil | Register under this atom |
Examples
{:ok, sub} = ExeQute.Subscriber.start_link(host: "tp-host", port: 5010)
ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)
Finds an existing subscriber for host/port or starts a new one.
Multiple calls with the same host and port return the same pid without opening a second TCP connection. This is the recommended way to obtain a subscriber in Livebook notebooks because re-evaluating a cell reuses the existing connection rather than tearing it down.
Examples
{:ok, tp} = ExeQute.Subscriber.start_or_find(host: "tp-host", port: 9600)
Stops the subscriber and closes its tickerplant connection.
Always returns :ok. Prefer ExeQute.unsubscribe/2 for removing individual
subscriptions without tearing down the connection.
Examples
ExeQute.Subscriber.stop(:tp)