ExeQute.Subscriber (exe_qute v0.1.1)

Copy Markdown

A long-lived KDB+ tickerplant connection that fans push messages out to multiple local subscribers over a single TCP connection.

In most cases you do not need to interact with this module directly — ExeQute.subscribe/2 and ExeQute.unsubscribe/2 create and manage subscriber processes for you.

Use this module directly when you want explicit lifecycle control, such as starting the subscriber in a supervision tree:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {ExeQute.Subscriber, host: "tp-host", port: 5010, name: :tp},
      MyApp.TradeHandler
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

Once started, attach subscriptions via ExeQute.subscribe/2:

ExeQute.subscribe(:tp, "trade")
ExeQute.subscribe(:tp, "quote")

Or use this module's own add/3 and add/4 for callback-based subscriptions:

{:ok, ref} = ExeQute.Subscriber.add(:tp, "trade", fn {table, data} ->
  IO.inspect({table, data})
end)

ExeQute.Subscriber.remove(:tp, ref)

Summary

Functions

Adds a callback subscription to table on the given subscriber.

Adds a callback subscription to table filtered to the given symbols.

Returns a specification to start this module under a supervisor.

Callback implementation for GenServer.init/1.

Removes a callback subscription identified by ref.

Starts a subscriber process connected to the KDB+ tickerplant at host/port.

Finds an existing subscriber for host/port or starts a new one.

Stops the subscriber and closes its tickerplant connection.

Functions

add(sub, table, callback)

@spec add(pid() | atom(), String.t(), (tuple() -> any())) ::
  {:ok, reference()} | {:error, term()}

Adds a callback subscription to table on the given subscriber.

The callback receives {table, data} on each update from the tickerplant.

Returns {:ok, ref} — pass ref to remove/2 to unsubscribe.

Examples

{:ok, ref} = ExeQute.Subscriber.add(:tp, "trade", fn {table, data} ->
  IO.inspect({table, data})
end)

add(sub, table, syms, callback)

@spec add(pid() | atom(), String.t(), [String.t()], (tuple() -> any())) ::
  {:ok, reference()} | {:error, term()}

Adds a callback subscription to table filtered to the given symbols.

Only updates for symbols in syms are delivered to the callback. Pass an empty list to receive all symbols.

Returns {:ok, ref} — pass ref to remove/2 to unsubscribe.

Examples

{:ok, ref} = ExeQute.Subscriber.add(:tp, "quote", ["AAPL", "MSFT"], fn {table, data} ->
  IO.inspect(data)
end)

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

init(opts)

Callback implementation for GenServer.init/1.

remove(sub, ref)

@spec remove(pid() | atom(), reference()) :: :ok

Removes a callback subscription identified by ref.

ref is the value returned by add/3 or add/4. Always returns :ok, even if the subscriber has already stopped.

Examples

{:ok, ref} = ExeQute.Subscriber.add(:tp, "trade", fn {_t, data} -> ... end)
ExeQute.Subscriber.remove(:tp, ref)

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a subscriber process connected to the KDB+ tickerplant at host/port.

Accepts the same connection options as ExeQute.connect/1. Pass name: to register the process under an atom for use throughout your application.

Options

OptionDefaultDescription
:host"localhost"Tickerplant hostname or IP
:port5001Tickerplant port
:usernamenilUsername
:passwordnilPassword
:namenilRegister under this atom

Examples

{:ok, sub} = ExeQute.Subscriber.start_link(host: "tp-host", port: 5010)

ExeQute.Subscriber.start_link(host: "tp-host", port: 5010, name: :tp)

start_or_find(opts)

@spec start_or_find(keyword()) :: {:ok, pid()} | {:error, term()}

Finds an existing subscriber for host/port or starts a new one.

Multiple calls with the same host and port return the same pid without opening a second TCP connection. This is the recommended way to obtain a subscriber in Livebook notebooks because re-evaluating a cell reuses the existing connection rather than tearing it down.

Examples

{:ok, tp} = ExeQute.Subscriber.start_or_find(host: "tp-host", port: 9600)

stop(sub)

@spec stop(pid() | atom()) :: :ok

Stops the subscriber and closes its tickerplant connection.

Always returns :ok. Prefer ExeQute.unsubscribe/2 for removing individual subscriptions without tearing down the connection.

Examples

ExeQute.Subscriber.stop(:tp)