Production-grade Elixir client for the Codat API — financial data connectivity for fintechs, lenders, and embedded finance platforms.
Codat normalises accounting, banking, and commerce data from 50+ integrations (QuickBooks, Xero, Sage, NetSuite, and more) into a single, consistent API.
Features
- ✅ Full API coverage — Platform, Accounting, Bank Feeds, Lending, Expenses, Bill Pay
- ✅ All endpoints — list, get, create, update, delete for every resource
- ✅ Lazy streaming —
stream/2returns a lazyStream, fetches pages on demand - ✅ Concurrent paging —
fetch_all/2fetches all pages in parallel - ✅ Composable query builder —
Codat.QueryBuilderfor typed filter expressions - ✅ Webhook handling — signature verification, Plug, typed event dispatcher
- ✅ Retry + backoff — exponential backoff with jitter, 429-aware retry
- ✅ Telemetry —
[:codat, :request, :start/stop/exception]events on every call - ✅ Structured errors —
%Codat.Error{}with type, message,correlation_id,retry_after - ✅ Multi-tenant — create one
%Codat.Client{}per tenant API key - ✅ Fully typed —
@type,@spec, and@callbackthroughout
Installation
Add codat to your mix.exs dependencies:
def deps do
[
{:codat, "~> 1.0"}
]
endConfiguration
Global (recommended)
# config/runtime.exs
import Config
config :codat,
api_key: System.fetch_env!("CODAT_API_KEY"),
http_timeout: 30_000,
max_retries: 3Per-client (multi-tenant)
client = Codat.Client.new(api_key: tenant_api_key)
Codat.Platform.Companies.list(client)Environment variable
export CODAT_API_KEY="your-api-key"
All config options:
| Option | Type | Default | Description |
|---|---|---|---|
api_key | string | nil | Your Codat API key |
base_url | string | "https://api.codat.io" | API base URL |
http_timeout | ms | 30_000 | Response timeout |
http_connect_timeout | ms | 5_000 | TCP connect timeout |
max_retries | integer | 3 | Max retry attempts for transient errors |
retry_base_delay | ms | 500 | Base delay for exponential backoff |
retry_max_delay | ms | 10_000 | Cap on backoff delay |
rate_limit_retry_delay | ms | 60_000 | Wait time on 429 before retrying |
pool_size | integer | 10 | Finch connection pool size |
pool_count | integer | 1 | Number of Finch connection pools |
Quick Start
# Create a client (or use global config)
client = Codat.client()
# Create a company when a customer onboards
{:ok, company} = Codat.Platform.Companies.create(client, %{
name: "Acme Corp",
tags: %{"tenant_id" => "t_123", "region" => "us-east-1"}
})
# Redirect the user to authorize their accounting software
redirect_to(company["redirect"])
# Once linked, list their invoices
{:ok, page} = Codat.Accounting.Invoices.list(client, company["id"])
page.results # => [%{"id" => "...", "status" => "Open", ...}, ...]
page.total # => 342
page.has_more? # => truePlatform API
Companies
# List with pagination
{:ok, page} = Codat.Platform.Companies.list(client, page: 1, page_size: 50)
# Filter by tag
{:ok, page} = Codat.Platform.Companies.list(client, tags: "region:us-east-1")
# Get a single company
{:ok, company} = Codat.Platform.Companies.get(client, "company-id")
# Create
{:ok, company} = Codat.Platform.Companies.create(client, %{
name: "Acme Corp",
tags: %{"tenant_id" => "t_123"}
})
# Update (always pass all tags you want to keep)
{:ok, company} = Codat.Platform.Companies.update(client, "company-id", %{
name: "Acme Corp v2",
tags: %{"tenant_id" => "t_123", "plan" => "enterprise"}
})
# Delete permanently
{:ok, nil} = Codat.Platform.Companies.delete(client, "company-id")
# Get access token for Link embed
{:ok, %{"accessToken" => token}} =
Codat.Platform.Companies.get_access_token(client, "company-id")Connections
# List connections for a company
{:ok, page} = Codat.Platform.Connections.list(client, "company-id")
# Filter by status
{:ok, page} = Codat.Platform.Connections.list(client, "company-id", query: "status=Linked")
# Create a new connection (QBO = "gbol")
{:ok, conn} = Codat.Platform.Connections.create(client, "company-id", "gbol")
redirect_to(conn["linkUrl"])
# Unlink (keep data, stop syncs)
{:ok, conn} = Codat.Platform.Connections.unlink(client, "company-id", "conn-id")
# Re-authorization URL for deauthorized connections
{:ok, %{"url" => url}} =
Codat.Platform.Connections.get_authorization_url(client, "company-id", "conn-id")Accounting API
Querying
Use Codat.QueryBuilder for typed, composable filter expressions:
import Codat.QueryBuilder
query =
where("status", :eq, "Open")
|> and_where("amountDue", :gt, 0)
|> and_where("currency", :eq, "GBP")
|> to_query_string()
# => "status=Open&&amountDue>0&¤cy=GBP"
{:ok, page} = Codat.Accounting.Invoices.list(client, company_id,
query: query,
order_by: "-issueDate",
page_size: 50
)Reading data
# Single resource
{:ok, invoice} = Codat.Accounting.Invoices.get(client, company_id, invoice_id)
# All supported data types
Codat.Accounting.Bills.list(client, company_id)
Codat.Accounting.Customers.list(client, company_id)
Codat.Accounting.Suppliers.list(client, company_id)
Codat.Accounting.Accounts.list(client, company_id)
Codat.Accounting.Payments.list(client, company_id)
Codat.Accounting.BankAccounts.list(client, company_id)
Codat.Accounting.JournalEntries.list(client, company_id)
# ... and many moreFinancial statements
# Balance sheet for the last 12 months (monthly periods)
{:ok, bs} = Codat.Accounting.BalanceSheet.get(client, company_id,
period_length: 1,
periods_to_compare: 12,
start_month: "2024-01"
)
# Profit & loss
{:ok, pl} = Codat.Accounting.ProfitAndLoss.get(client, company_id,
period_length: 3,
periods_to_compare: 4
)
# Cash flow statement
{:ok, cf} = Codat.Accounting.CashFlowStatement.get(client, company_id)Lazy streaming (memory-efficient)
# Stream all open invoices without loading everything into memory
Codat.Accounting.Invoices.stream(client, company_id,
query: "status=Open",
page_size: 100
)
|> Stream.filter(&(&1["amountDue"] > 1000))
|> Stream.map(&enrich_invoice/1)
|> Stream.each(&MyApp.Repo.insert!/1)
|> Stream.run()Concurrent fetch all pages
# Fetch all pages concurrently and return a flat list
{:ok, all_invoices} = Codat.Accounting.Invoices.fetch_all(client, company_id,
max_concurrency: 5,
page_size: 100
)Write operations (async)
All write operations are async and return a push operation:
# 1. Check the model to understand required fields
{:ok, model} = Codat.Accounting.Invoices.get_create_model(client, company_id,
connection_id: conn_id
)
# 2. Create (returns immediately with Pending status)
{:ok, push_op} = Codat.Accounting.Invoices.create(client, company_id, conn_id, %{
issueDate: "2024-01-15",
dueDate: "2024-02-15",
customerRef: %{id: "customer-id"},
lineItems: [
%{
description: "Consulting",
quantity: 1,
unitAmount: 1000.00,
accountRef: %{id: "account-id"}
}
]
})
key = push_op["pushOperationKey"]
# 3a. Poll for completion
{:ok, done} = Codat.Platform.PushOperations.poll_until_done(client, company_id, key,
poll_interval: 2_000,
timeout: 60_000
)
IO.puts(done["status"]) # => "Success"
# 3b. Or subscribe to webhook (recommended for production)
# See Webhook section belowWebhooks
Receiving webhooks in Phoenix
Step 1: Add the body reader to your endpoint (preserves raw body for signature verification):
# lib/my_app_web/endpoint.ex
plug Plug.Parsers,
parsers: [:urlencoded, :multipart, :json],
pass: ["*/*"],
body_reader: {Codat.Webhooks.BodyReader, :read_body, []},
json_decoder: JasonStep 2: Mount the webhook plug in your router:
# lib/my_app_web/router.ex
forward "/webhooks/codat", Codat.Webhooks.Plug,
secret: System.get_env("CODAT_WEBHOOK_SECRET"),
handler: MyApp.CodatWebhookHandlerStep 3: Implement your handler:
defmodule MyApp.CodatWebhookHandler do
use Codat.Webhooks.Handler
require Codat.Webhooks.EventTypes
import Codat.Webhooks.EventTypes
@impl true
def handle_event(event_type, payload, _meta)
when event_type == invoices_write_successful() do
company_id = payload["companyId"]
push_op_key = payload["data"]["pushOperationKey"]
MyApp.InvoiceSync.handle_success(company_id, push_op_key)
:ok
end
@impl true
def handle_event(event_type, payload, _meta)
when event_type == invoices_write_unsuccessful() do
company_id = payload["companyId"]
error_msg = payload["data"]["errorMessage"]
MyApp.InvoiceSync.handle_failure(company_id, error_msg)
:ok
end
@impl true
def handle_event(_event_type, _payload, _meta), do: :ok
endDispatching to multiple handlers
defmodule MyApp.CodatWebhooks do
use Codat.Webhooks.Dispatcher
on ["invoices.write.successful", "invoices.write.unsuccessful"],
to: MyApp.InvoiceSyncHandler
on ~r/^bills\.write\./,
to: MyApp.BillSyncHandler
on ~r/^client\.rateLimit\./,
to: MyApp.OpsAlertsHandler
on :all, to: MyApp.AuditLogHandler
endStandalone signature verification
case Codat.Webhooks.Verifier.verify(secret, raw_body, conn.req_headers) do
:ok ->
payload = Jason.decode!(raw_body)
process_event(payload)
{:error, :invalid_signature} ->
conn |> send_resp(401, "Invalid signature")
{:error, :expired} ->
conn |> send_resp(401, "Webhook expired")
endBank Feeds
# Create source accounts
{:ok, account} = Codat.BankFeeds.SourceAccounts.create(
client, company_id, conn_id,
%{
accountName: "Business Checking",
accountNumber: "12345678",
accountType: "Checking",
currency: "USD",
balance: 10_000.00
}
)
# Get account mapping options (available GL accounts)
{:ok, options} = Codat.BankFeeds.AccountMapping.get_options(
client, company_id, conn_id, account["id"]
)
# Map the source account to a GL account
{:ok, _} = Codat.BankFeeds.AccountMapping.set(
client, company_id, conn_id, account["id"],
%{targetAccountId: "gl-account-id", feedStartDate: "2024-01-01"}
)
# Push transactions
{:ok, push_op} = Codat.BankFeeds.Transactions.create(
client, company_id, conn_id,
%{
accountId: account["id"],
transactions: [
%{
id: "txn-001",
date: "2024-01-15",
description: "Stripe payout",
amount: 5_000.00,
balance: 15_000.00
}
]
}
)Lending
# Enhanced accounts receivable
{:ok, invoices} = Codat.Lending.AccountsReceivable.invoices(client, company_id)
{:ok, customers} = Codat.Lending.AccountsReceivable.customers(client, company_id)
# Enhanced financial statements
{:ok, bs} = Codat.Lending.FinancialStatements.balance_sheet(client, company_id,
period_length: 1,
periods_to_compare: 12
)
# Data integrity cross-referencing
{:ok, status} = Codat.Lending.DataIntegrity.status(client, company_id)
{:ok, details} = Codat.Lending.DataIntegrity.details(client, company_id, "invoices",
query: "status=Unmatched"
)
# Generate and download Excel audit report
{:ok, _} = Codat.Lending.ExcelReports.generate(client, company_id, "audit")
{:ok, status} = Codat.Lending.ExcelReports.status(client, company_id, "audit")
{:ok, bytes} = Codat.Lending.ExcelReports.download(client, company_id, "audit")
File.write!("audit-report.xlsx", bytes)Expenses
# 1. Initialize a sync session
{:ok, sync} = Codat.Expenses.Sync.initialize(client, company_id, %{dataType: "expense"})
sync_id = sync["id"]
# 2. Push expense transactions
{:ok, _} = Codat.Expenses.Transactions.create(client, company_id, sync_id, [
%{
id: "txn-001",
type: "payment",
issueDate: "2024-01-15T00:00:00Z",
currency: "USD",
currencyRate: 1.0,
contactRef: %{id: "supplier-id", type: "Supplier"},
lineItems: [
%{
accountRef: %{id: "account-id"},
description: "Office supplies",
netAmount: 99.99,
taxAmount: 8.00,
taxRateRef: %{id: "tax-rate-id"}
}
]
}
])
# 3. Complete the sync (writes to accounting platform)
{:ok, _} = Codat.Expenses.Sync.complete(client, company_id, sync_id)
# Subscribe to expenses.sync.successful / expenses.sync.failed webhooksError Handling
case Codat.Accounting.Invoices.list(client, company_id) do
{:ok, page} ->
page.results
{:error, %Codat.Error{type: :unauthorized}} ->
# Check API key configuration
{:error, :invalid_api_key}
{:error, %Codat.Error{type: :not_found}} ->
{:error, :company_not_found}
{:error, %Codat.Error{type: :rate_limited, retry_after: ms}} ->
Process.sleep(ms)
retry()
{:error, %Codat.Error{type: :server_error, correlation_id: id}} ->
Logger.error("Codat error, report this correlation_id to support: #{id}")
{:error, :codat_unavailable}
{:error, %Codat.Error{type: :payment_required}} ->
# Free tier limit — upgrade plan or reduce company count
{:error, :plan_limit_exceeded}
endTelemetry
# Attach the built-in logger (optional)
Codat.Telemetry.attach_default_logger(:debug)
# Or attach your own handler
:telemetry.attach(
"my-codat-handler",
[:codat, :request, :stop],
fn _event, %{duration: duration, status_code: code}, %{operation: op}, _config ->
ms = System.convert_time_unit(duration, :native, :millisecond)
MyApp.Metrics.histogram("codat.request.duration_ms", ms, tags: [operation: op])
end,
nil
)
# With telemetry_metrics
[
Telemetry.Metrics.counter("codat.request.stop",
tags: [:operation, :result]
),
Telemetry.Metrics.summary("codat.request.stop.duration",
unit: {:native, :millisecond},
tags: [:api_module, :operation]
),
Telemetry.Metrics.counter("codat.rate_limit.hit",
tags: []
)
]Testing
The package includes full test support:
# Run tests
mix test
# With coverage
mix test.coverage
# Linting
mix check
In your own application tests, use Bypass to mock the Codat API:
setup do
bypass = Bypass.open()
client = Codat.Client.new(
api_key: "test-key",
base_url: "http://localhost:#{bypass.port}"
)
%{bypass: bypass, client: client}
end
test "processes new invoices", %{bypass: bypass, client: client} do
Bypass.expect_once(bypass, "GET", "/companies/co-1/data/invoices", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/json")
|> Plug.Conn.send_resp(200, Jason.encode!(%{
"results" => [%{"id" => "inv-1", "status" => "Open"}],
"pageNumber" => 1,
"pageSize" => 100,
"totalResults" => 1
}))
end)
assert {:ok, page} = Codat.Accounting.Invoices.list(client, "co-1")
assert page.total == 1
endLicense
MIT — see LICENSE for details.