TdsCdc. Listener behaviour
(tds_cdc v0.1.0)
Copy Markdown
Behaviour for structured CDC event listeners.
Use this module to create a listener that follows a defined workflow, with mandatory handling of each change operation type via callbacks.
Usage
defmodule MyApp.CdcListener do
use TdsCdc.Listener
@impl true
def on_init(_opts) do
{:ok, %{inserts: 0, updates: 0, deletes: 0}}
end
@impl true
def on_insert(change, state) do
IO.puts("New record: #{inspect(change.data)}")
{:ok, %{state | inserts: state.inserts + 1}}
end
@impl true
def on_update(change, state) do
IO.puts("Updated: #{inspect(change.data)}")
{:ok, %{state | updates: state.updates + 1}}
end
@impl true
def on_delete(change, state) do
IO.puts("Deleted: #{inspect(change.data)}")
{:ok, %{state | deletes: state.deletes + 1}}
end
@impl true
def on_gap(ci, old_lsn, min_lsn, state) do
Logger.warning("Gap detected in #{ci}")
{:ok, state}
end
endThen add to your supervision tree:
children = [
{MyApp.CdcListener, conn: [hostname: "localhost", ...], capture_instances: ["dbo_users"]}
]Or start manually:
{:ok, pid} = MyApp.CdcListener.start_link(
conn: [hostname: "localhost", username: "sa", password: "pass", database: "mydb"],
capture_instances: ["dbo_users"]
)Connection options
Same as TdsCdc.Client.start_link/1:
:conn- Direct TDS connection options:repo- An existing Ecto.Repo module:capture_instances- List of CDC capture instance names (required):poll_interval- Polling interval in ms (default: 1000):name- GenServer name registration (default: module name)
Callbacks
All callbacks are optional and have default implementations.
Returning {:ok, state} continues the listener.
Returning {:stop, reason} stops the listener process.
Summary
Callbacks
Called when a DELETE change is received.
Called when a CDC gap is detected.
Called when the listener starts, after CDC subscription is established.
Called when an INSERT change is received.
Called when the listener process is about to terminate.
Called when an UPDATE change is received.
Types
Callbacks
@callback on_delete(change :: TdsCdc.Change.t(), state :: state()) :: {:ok, state()} | {:stop, reason()}
Called when a DELETE change is received.
The change argument is a %TdsCdc.Change{} struct with operation: :delete.
@callback on_gap( capture_instance :: String.t(), old_lsn :: binary(), min_lsn :: binary(), state :: state() ) :: {:ok, state()} | {:stop, reason()}
Called when a CDC gap is detected.
This happens when the stored LSN position falls behind the minimum available LSN in CDC tables, meaning some changes were lost due to retention cleanup.
capture_instance- The capture instance where the gap was detected.old_lsn- The LSN that was stored (now too old).min_lsn- The new minimum LSN (position will be reset to this).
Called when the listener starts, after CDC subscription is established.
Use this to initialize your listener state. Receives the full options
keyword list passed to start_link/1.
Return {:ok, state} to continue, or {:stop, reason} to stop.
@callback on_insert(change :: TdsCdc.Change.t(), state :: state()) :: {:ok, state()} | {:stop, reason()}
Called when an INSERT change is received.
The change argument is a %TdsCdc.Change{} struct with operation: :insert.
Called when the listener process is about to terminate.
Use this for cleanup. The return value is ignored.
@callback on_update(change :: TdsCdc.Change.t(), state :: state()) :: {:ok, state()} | {:stop, reason()}
Called when an UPDATE change is received.
The change argument is a %TdsCdc.Change{} struct with operation: :update.
Note: CDC produces two rows per update (before-image with operation=3, after-image
with operation=4). Both are mapped to :update and delivered separately.