Change Data Capture for SQL Server in Elixir.

TdsCdc captures row-level changes (INSERT, UPDATE, DELETE) from SQL Server tables with CDC enabled. It periodically polls CDC change tables and publishes events to subscribed processes.

Requirements

  • Elixir ~> 1.18
  • SQL Server 2016+ with CDC enabled
  • SQL Server Agent (sqlagent) running (required by CDC)

Installation

Add to mix.exs:

def deps do
  [
    {:tds_cdc, "~> 0.1.0"}
  ]
end

For Ecto integration, also add:

def deps do
  [
    {:tds_cdc, "~> 0.1.0"},
    {:ecto_sql, "~> 3.0"},
    {:tds_ecto, "~> 2.3"}
  ]
end

SQL Server Setup

Enable CDC on the database

USE my_database;
GO
EXEC sys.sp_cdc_enable_db;
GO

Enable CDC on a table

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'users',
    @role_name     = NULL;  -- NULL = unrestricted access
GO

This creates a capture instance named dbo_users and a table cdc.dbo_users_CT where SQL Server stores the changes.

The capture instance name follows the pattern <schema>_<table> — so dbo.users becomes dbo_users. You use this name everywhere in TdsCdc: capture_instances: ["dbo_users"], TdsCdc.subscribe("dbo_users"), etc. You can also specify a custom name via @capture_instance when enabling CDC.

Warning — Schema changes are not propagated to CDC tables. If you add or remove columns from the source table (dbo.users), the _CT table will not reflect the new schema. You must either:

  1. Disable and re-enable CDC — all historical change data is lost:

       EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'users', @capture_instance = N'dbo_users';
       EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'users', @role_name = NULL;
  2. Create a second capture instance — keeps the old one alive during transition:

       EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'users', @capture_instance = N'dbo_users_v2', @role_name = NULL;

    Then switch TdsCdc to capture_instances: ["dbo_users_v2"] and disable the old instance when ready.

Connection adapters

TdsCdc supports two connection modes:

Option A: Direct TDS connection (default)

Manages its own connection pool. No external dependencies beyond tds.

{:ok, pid} = TdsCdc.start_link(
  conn: [
    hostname: "localhost",
    port: 1433,
    username: "sa",
    password: "YourStrong!Passw0rd",
    database: "my_database"
  ],
  capture_instances: ["dbo_users"],
  poll_interval: 1_000
)

The :conn options are forwarded to Tds.start_link/1. Additional pool options:

OptionDefaultDescription
:pool_size5Number of connections in the pool
:ownership_timeout30000Max time (ms) a connection can be checked out
:timeout30000Query timeout (ms)

Option B: Ecto.Repo

Uses an existing Ecto.Repo for all queries. Shares the Repo's connection pool — no separate TDS connection needed.

# In your application.ex supervision tree:
children = [
  MyApp.Repo,
  {TdsCdc.Client, repo: MyApp.Repo, capture_instances: ["dbo_users"]}
]

# Or at runtime:
{:ok, pid} = TdsCdc.start_link(
  repo: MyApp.Repo,
  capture_instances: ["dbo_users"]
)

Requirements: The Repo must use the TDS adapter (tds_ecto) and be started before TdsCdc.

Usage

Subscribe to changes

:ok = TdsCdc.subscribe("dbo_users")

receive do
  {:tds_cdc_change, "dbo_users", %TdsCdc.Change{operation: :insert, data: %{id: 1, name: "Alice"}}} ->
    IO.puts("New user: #{change.data.name}")

  {:tds_cdc_change, "dbo_users", %TdsCdc.Change{operation: :update, data: %{id: 1, name: "Alice"}}} ->
    IO.puts("User updated")

  {:tds_cdc_change, "dbo_users", %TdsCdc.Change{operation: :delete, data: %{id: 2, name: "Bob"}}} ->
    IO.puts("User deleted")
end

Unsubscribe

:ok = TdsCdc.unsubscribe("dbo_users")

Query current LSN position

The LSN (Log Sequence Number) position indicates how far the client has processed changes in the SQL Server transaction log. Use this to check which changes have already been delivered.

{:ok, lsn} = TdsCdc.current_lsn("dbo_users")

Stop the client

Gracefully stops the CDC client process, cancels its polling timer, and (if using :conn) closes the database connection. Subscribers will no longer receive change events.

:ok = TdsCdc.stop()

Check CDC status (utility functions)

These functions accept either a TDS connection pid or an Ecto.Repo module:

# With TDS connection
{:ok, conn} = Tds.start_link(conn_opts)
{:ok, true} = TdsCdc.cdc_enabled?(conn)
{:ok, ["dbo_users", "dbo_orders"]} = TdsCdc.list_capture_instances(conn)
GenServer.stop(conn)

# With Ecto.Repo
{:ok, true} = TdsCdc.cdc_enabled?(MyApp.Repo)
{:ok, ["dbo_users"]} = TdsCdc.list_capture_instances(MyApp.Repo)

Wait for client to be ready

{:ok, pid} = TdsCdc.start_link(conn: conn_opts, capture_instances: ["dbo_users"])
:ok = TdsCdc.wait_for_ready(timeout: 10_000, capture_instance: "dbo_users")
TdsCdc.subscribe("dbo_users")

LSN persistence

By default, TdsCdc persists LSN positions to disk so they survive application restarts. When the client starts, it loads saved positions and resumes from where it left off (provided the positions are still within the CDC retention window).

Default: file-based persistence

Positions are saved as JSON files in <system_tmp>/tds_cdc/<client_name>.json:

# Default (automatic)
TdsCdc.start_link(conn: [...], capture_instances: ["dbo_users"])

# Custom path
TdsCdc.start_link(
  conn: [...],
  capture_instances: ["dbo_users"],
  persistence: {TdsCdc.Persistence.File, path: "/var/lib/myapp/lsn"}
)

Custom persistence module

Implement the TdsCdc.Persistence behaviour to store positions in a database, Redis, or any other backend:

defmodule MyApp.DbPersistence do
  @behaviour TdsCdc.Persistence

  @impl true
  def save_positions(_name, positions) do
    # Write positions to your database
    :ok
  end

  @impl true
  def load_positions(name) do
    # Read positions from your database
    {:ok, %{}}  # or {:error, :not_found}
  end
end

TdsCdc.start_link(
  conn: [...],
  capture_instances: ["dbo_users"],
  persistence: {MyApp.DbPersistence, []}
)

Disable persistence

If you don't need positions to survive restarts:

TdsCdc.start_link(
  conn: [...],
  capture_instances: ["dbo_users"],
  persistence: nil
)

Multiple instances

You can run multiple clients with different configurations:

{:ok, pid_fast} = TdsCdc.start_link(
  name: TdsCdc.Fast,
  conn: [hostname: "localhost", ...],
  capture_instances: ["dbo_users"],
  poll_interval: 100
)

{:ok, pid_slow} = TdsCdc.start_link(
  name: TdsCdc.Slow,
  repo: MyApp.Repo,
  capture_instances: ["dbo_orders"],
  poll_interval: 5_000
)

TdsCdc.subscribe(TdsCdc.Fast, "dbo_users")
TdsCdc.subscribe(TdsCdc.Slow, "dbo_orders")

Each instance tracks its own LSN position independently.

Gap detection

SQL Server purges old CDC data based on the configured retention period (default: 3 days). If a client falls behind the oldest available change data, TdsCdc detects the gap and:

  1. Sends {:tds_cdc_gap_detected, capture_instance, old_lsn, min_lsn} to all subscribers
  2. Logs a warning
  3. Resets the position to the current min_lsn and continues from there
receive do
  {:tds_cdc_gap_detected, ci, old_lsn, min_lsn} ->
    Logger.warning("Data lost in #{ci} between #{inspect(old_lsn)} and #{inspect(min_lsn)}")
end

Change struct

%TdsCdc.Change{
  capture_instance: "dbo_users",
  operation: :insert,          # :insert | :update | :delete
  data: %{id: 1, name: "Alice", email: "alice@example.com"},
  lsn: <<0, 0, 0, 42, 0, 0, 11, 128, 0, 82>>,
  lsn_prev: nil,
  seqval: <<0, 0, 0, 42, 0, 0, 11, 128, 0, 83>>,
  commit_lsn: nil,
  transaction_order: nil
}

Note on UPDATE operations: CDC records operation=3 (before image) and operation=4 (after image). Both are mapped to :update.

Architecture

When you enable CDC on a table (e.g. dbo.users), SQL Server creates a change table named cdc.<schema>_<table>_CT (e.g. cdc.dbo_users_CT). The SQL Server Agent (CDC capture job) reads the transaction log and populates these _CT tables with every INSERT, UPDATE, and DELETE as they occur. Each _CT row includes metadata columns (__$operation, __$start_lsn, __$seqval) plus all the tracked table's columns.

TdsCdc periodically queries these _CT tables using the sys.fn_cdc_get_all_changes_* function, starting from the last LSN it processed. It advances the LSN position after each poll so no change is delivered twice.

SQL Server                          Elixir - TdsCdc
          
 dbo.users.  Transaction        TdsCdc.Client (GenServer) 
 dbo.orders         Log                                       
            Connection Adapter       
                                         
                                         TdsCdc.Connection.Tds 
                            -or-                  
                  CDC Agent             TdsCdc.Connection.    
                  (sqlagent)              Ecto                
                            
                                                                 
                                        :poll  fetch_changes 
                          %Change{}      
                  cdc.dbo_users_CT           send to subs   
                  cdc.dbo_orders_CT                             
                    lsn_positions tracker    
                                         subscribers registry     
                                       
                                                      send/2
                                                     
                                               
                                                App      
                                                Consumer 
                                               

Structured listener with use TdsCdc.Listener

For a more structured approach, use the TdsCdc.Listener behaviour. It auto-starts a CDC client, subscribes, and dispatches changes to your callbacks:

defmodule MyApp.CdcListener do
  use TdsCdc.Listener

  @impl true
  def on_init(_opts) do
    {:ok, %{inserts: 0, updates: 0, deletes: 0}}
  end

  @impl true
  def on_insert(change, state) do
    IO.puts("New record: #{inspect(change.data)}")
    {:ok, %{state | inserts: state.inserts + 1}}
  end

  @impl true
  def on_update(change, state) do
    IO.puts("Updated: #{inspect(change.data)}")
    {:ok, %{state | updates: state.updates + 1}}
  end

  @impl true
  def on_delete(change, state) do
    IO.puts("Deleted: #{inspect(change.data)}")
    {:ok, %{state | deletes: state.deletes + 1}}
  end

  @impl true
  def on_gap(ci, old_lsn, min_lsn, state) do
    Logger.warning("Gap detected in #{ci}")
    {:ok, state}
  end
end

Add to your supervision tree:

children = [
  {MyApp.CdcListener, conn: [hostname: "localhost", ...], capture_instances: ["dbo_users"]}
]

All callbacks are optional and have default implementations. Return {:ok, state} to continue or {:stop, reason} to stop the listener.

Docker example

cd example
docker compose up --build

Spins up SQL Server 2022 with CDC enabled, a web app with CRUD for users on port 4000, and a listener that prints CDC changes in real time.

  • Web app: http://localhost:4000 — create, edit, and delete users
  • Listener — prints INSERT/UPDATE/DELETE events captured via CDC

Environment variables

VariableDefaultDescription
TDS_HOSTlocalhostSQL Server host
TDS_PORT1433SQL Server port
TDS_USERNAMEsaUsername
TDS_PASSWORDYourStrong!Passw0rdPassword
TDS_DATABASEcdc_exampleDatabase name

Modules

ModuleDescription
TdsCdcPublic API (start_link, subscribe, unsubscribe, cdc_enabled?, list_capture_instances)
TdsCdc.ClientGenServer that manages connection and polling
TdsCdc.ListenerBehaviour for structured CDC event listeners
TdsCdc.ConnectionBehaviour for database connection adapters
TdsCdc.Connection.TdsDirect TDS connection adapter (default)
TdsCdc.Connection.EctoEcto.Repo connection adapter
TdsCdc.ChangeStruct representing a change event
TdsCdc.LsnUtilities for Log Sequence Numbers
TdsCdc.PersistenceBehaviour for LSN position persistence
TdsCdc.Persistence.FileDefault file-based LSN persistence

License

MIT