Arrow Flight SQL
View SourceArrow Flight SQL layers SQL query semantics on top of Arrow Flight (gRPC + Arrow IPC).
Queries are dispatched to the server, which executes them and streams results back as
columnar RecordBatch data — the same Arrow format used everywhere in ExArrow.
When to use Flight SQL vs ADBC
| Scenario | Recommended |
|---|---|
| Remote query server (DuckDB over the network, DataFusion, Dremio, InfluxDB v3) | Flight SQL |
| In-process database (DuckDB local, SQLite, PostgreSQL via driver) | ADBC |
| Receiving Arrow data from an existing Flight server (non-SQL) | ExArrow.Flight.Client |
Module overview
| Module | Purpose |
|---|---|
ExArrow.FlightSQL.Client | Connect, query, and execute DML |
ExArrow.FlightSQL.Statement | Reusable server-side prepared statement handle |
ExArrow.FlightSQL.Result | Materialised result (schema + batches + row count) |
ExArrow.FlightSQL.Error | Structured error type for all Flight SQL failures |
All operations go through ExArrow.FlightSQL.Client.
Quick start
{:ok, client} = ExArrow.FlightSQL.Client.connect("localhost:32010")
# Materialised query — collects all batches before returning
{:ok, result} = ExArrow.FlightSQL.Client.query(client, "SELECT id, name FROM users")
result.num_rows #=> 42
result.schema #=> %ExArrow.Schema{...}
# Lazy query — streams batches one at a time
{:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM big_table")
# DML
{:ok, 3} = ExArrow.FlightSQL.Client.execute(client, "DELETE FROM t WHERE id < 4")
{:ok, :unknown} = ExArrow.FlightSQL.Client.execute(client, "CREATE TABLE t (id INT)")Connection
connect/1 accepts a "host:port" string. connect/2 accepts the same string plus
a keyword options list.
{:ok, client} = ExArrow.FlightSQL.Client.connect("localhost:32010")
{:ok, client} = ExArrow.FlightSQL.Client.connect("dremio.example.com:32010", tls: true)IPv6 addresses must use the bracketed URI form:
{:ok, client} = ExArrow.FlightSQL.Client.connect("[::1]:32010")TLS
:tls value | Behaviour |
|---|---|
not set, loopback host (localhost, 127.0.0.1, ::1) | plaintext (auto) |
| not set, remote host | TLS with native OS certificate store (auto) |
false | plaintext regardless of host |
true | TLS with native OS certificate store |
[ca_cert_pem: pem] | TLS with a custom PEM-encoded CA certificate |
# Custom CA (mutual TLS or private PKI)
pem = File.read!("priv/ca.pem")
{:ok, client} = ExArrow.FlightSQL.Client.connect("secure.server:32010", tls: [ca_cert_pem: pem])Authentication
Pass credentials as gRPC metadata via the :headers option. Bearer tokens are the
most common pattern:
{:ok, client} = ExArrow.FlightSQL.Client.connect("dremio.example.com:32010",
tls: true,
headers: [{"authorization", "Bearer my-pat-token"}]
)Any number of {name, value} string tuples are accepted; they are sent as gRPC
metadata on every request.
Queries
Materialised — query/2
Collects all record batches from the server before returning. Returns an
ExArrow.FlightSQL.Result struct.
{:ok, result} = ExArrow.FlightSQL.Client.query(client, "SELECT * FROM orders LIMIT 1000")
result.num_rows #=> 1000
result.schema #=> %ExArrow.Schema{...}
result.batches #=> [%ExArrow.RecordBatch{...}, ...]Use query!/2 to raise ExArrow.FlightSQL.Error on failure instead of returning
{:error, ...}:
result = ExArrow.FlightSQL.Client.query!(client, "SELECT count(*) FROM orders")Lazy — stream_query/2
Returns an ExArrow.Stream that is consumed one batch at a time. The gRPC
connection stays open until the stream is exhausted or the resource is
garbage-collected.
Prefer this over query/2 for large result sets that should not be fully buffered
in memory.
Using Enum functions (recommended)
ExArrow.Stream implements the Enumerable protocol, so all Enum and Stream
functions work directly on the stream handle. Each element is an
ExArrow.RecordBatch.t().
{:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM events")
# Collect all batches into a list
batches = Enum.to_list(stream)
# Map over every batch
row_counts = Enum.map(stream, &ExArrow.RecordBatch.num_rows/1)
# Take only the first N batches — the rest are never fetched
first_two = Enum.take(stream, 2)
# Comprehension syntax
for batch <- stream, do: process_batch(batch)Enumeration raises RuntimeError on a transport or server error. For
recoverable error handling, iterate manually with ExArrow.Stream.next/1:
case ExArrow.Stream.next(stream) do
nil -> :done
{:error, msg} -> {:error, msg}
batch -> process(batch)
endManual iteration
{:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM events")
{:ok, schema} = ExArrow.Stream.schema(stream)
# Collect everything (raises on error)
batches = ExArrow.Stream.to_list(stream)Resource lifecycle
The underlying gRPC channel and batch buffer are held in a native (Rust) resource.
The resource is released when the stream handle is garbage-collected. Stopping
enumeration early (e.g. Enum.take/2) is safe — the resource is released once the
stream variable goes out of scope.
Concurrency
Concurrent calls on the same client handle serialise internally — the gRPC
client requires exclusive access per call. Create separate handles with
connect/2 for parallel query workloads.
DML and DDL — execute/2
Runs INSERT, UPDATE, DELETE, CREATE TABLE, and similar statements. Returns the
affected row count or :unknown when the server does not report one.
{:ok, 5} = ExArrow.FlightSQL.Client.execute(client, "DELETE FROM logs WHERE ts < now() - interval '7 days'")
{:ok, :unknown} = ExArrow.FlightSQL.Client.execute(client, "CREATE TABLE staging AS SELECT * FROM raw")Error handling
All non-bang functions return {:ok, value} or {:error, %ExArrow.FlightSQL.Error{}}.
case ExArrow.FlightSQL.Client.query(client, sql) do
{:ok, result} -> handle(result)
{:error, %ExArrow.FlightSQL.Error{code: :unauthenticated}} -> reauthenticate()
{:error, err} -> Logger.error(ExArrow.FlightSQL.Error.message(err))
endError codes
| Code | Meaning |
|---|---|
:transport_error | TCP/TLS channel failure; also Cancelled, Unavailable, DeadlineExceeded |
:server_error | gRPC INTERNAL, ResourceExhausted, Aborted, DataLoss |
:unimplemented | Server does not support the operation |
:unauthenticated | Missing or rejected credentials |
:permission_denied | Insufficient privileges |
:not_found | Table or object does not exist |
:invalid_argument | Bad SQL syntax, wrong parameter types, OutOfRange |
:protocol_error | Malformed or unexpected Flight SQL response |
:multi_endpoint | FlightInfo returned more than one endpoint (not supported in v0.5.0) |
:invalid_option | Invalid connect or query option at the Elixir layer |
:conversion_error | Arrow → Explorer or Arrow → Nx conversion failure |
The :grpc_status field on the error struct holds the raw gRPC integer code when the
failure came from the server; it is nil for local (transport or option) errors.
Explorer and Nx integration
Convert to an Explorer DataFrame
Requires the optional :explorer dependency ({:explorer, "~> 0.11"}):
{:ok, result} = ExArrow.FlightSQL.Client.query(client, "SELECT * FROM sales")
{:ok, df} = ExArrow.FlightSQL.Result.to_dataframe(result)For large result sets, convert the stream directly:
{:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM big_table")
{:ok, df} = ExArrow.Explorer.from_stream(stream)Convert a column to an Nx tensor
Requires the optional :nx dependency ({:nx, "~> 0.9"}). Only the first batch
is converted — for multi-batch results iterate the stream and convert batch-by-batch:
{:ok, result} = ExArrow.FlightSQL.Client.query(client, "SELECT price FROM quotes")
{:ok, tensor} = ExArrow.FlightSQL.Result.to_tensor(result, "price")Testing with Mox
Swap the real implementation for a mock in tests by setting the
:flight_sql_client_impl application environment key:
# test/test_helper.exs
Mox.defmock(MyApp.FlightSQLMock, for: ExArrow.FlightSQL.ClientBehaviour)
# In your test
Application.put_env(:ex_arrow, :flight_sql_client_impl, MyApp.FlightSQLMock)
Mox.expect(MyApp.FlightSQLMock, :query, fn _client, "SELECT 1", [] ->
{:ok, %ExArrow.Stream{resource: make_ref(), backend: :flight_sql}}
end)Prepared statements
Prepared statements allow the server to parse and plan a query once, then execute it one or more times. This is useful for repeated queries and for getting early syntax errors before execution.
# Prepare the query (server parses and plans it)
{:ok, stmt} = ExArrow.FlightSQL.Client.prepare(client, "SELECT * FROM events WHERE ts > '2024-01-01'")
# Execute as a streaming query
{:ok, stream} = ExArrow.FlightSQL.Statement.execute(stmt)
batches = Enum.to_list(stream)
# Re-execute the same statement (reuses the server plan)
{:ok, stream2} = ExArrow.FlightSQL.Statement.execute(stmt)
# Or execute as DML
{:ok, dml_stmt} = ExArrow.FlightSQL.Client.prepare(client, "DELETE FROM logs WHERE ts < '2020-01-01'")
{:ok, 1042} = ExArrow.FlightSQL.Statement.execute_update(dml_stmt)Lifecycle
The server-side handle is released when the Statement struct is
garbage-collected. There is no explicit close/1 in v0.5.0.
Parameter binding
Parameter binding (passing ? placeholders with Arrow record batch values)
is not supported in v0.5.0. Parameterized queries can be prepared and
executed, but parameter values cannot be set from Elixir in this release.
Server compatibility
Prepared statement support is optional in the Arrow Flight SQL specification.
Servers that do not implement CreatePreparedStatement return
{:error, %Error{code: :unimplemented}}.
Metadata discovery
List tables — get_tables/2
Returns a lazy stream of record batches describing the tables visible to the connected user.
{:ok, stream} = ExArrow.FlightSQL.Client.get_tables(client)
batches = Enum.to_list(stream)
# With filters
{:ok, stream} = ExArrow.FlightSQL.Client.get_tables(client,
db_schema_filter: "public",
table_types: ["TABLE", "VIEW"]
)Result columns (per the Arrow Flight SQL specification):
catalog_name— utf8 (nullable)db_schema_name— utf8 (nullable)table_name— utf8table_type— utf8
Options:
:catalog— exact catalog name filter:db_schema_filter— SQLLIKEpattern for schema names:table_name_filter— SQLLIKEpattern for table names:table_types— list of type strings, e.g.["TABLE", "VIEW"]:include_schema—trueadds an IPC-encoded schema column per table (default:false)
List schemas — get_db_schemas/2
Returns a lazy stream of record batches describing the database schemas.
{:ok, stream} = ExArrow.FlightSQL.Client.get_db_schemas(client)
batches = Enum.to_list(stream)
{:ok, stream} = ExArrow.FlightSQL.Client.get_db_schemas(client, catalog: "main")Result columns: catalog_name (nullable), db_schema_name.
Server capabilities — get_sql_info/1
Returns a lazy stream of record batches encoding server capability flags and SQL dialect information as defined by the Flight SQL specification.
{:ok, stream} = ExArrow.FlightSQL.Client.get_sql_info(client)
batches = Enum.to_list(stream)Each row has two columns:
info_name— uint32 (the numericSqlInfocode)value— dense_union (the value; type depends on the code)
Server compatibility note
Metadata support is optional in the Flight SQL specification. Servers that
do not implement a particular metadata command return
{:error, %ExArrow.FlightSQL.Error{code: :unimplemented}}. Always pattern-match on
the error code rather than assuming all metadata APIs are available:
case ExArrow.FlightSQL.Client.get_tables(client) do
{:ok, stream} -> Enum.to_list(stream)
{:error, %Error{code: :unimplemented}} -> [] # server doesn't support it
{:error, err} -> raise err
endv0.5.0 scope
Supported:
- Ad-hoc SQL query execution (
query/2,query!/2,stream_query/2) - DML execution with affected-row count (
execute/2) - Lazy streaming of large result sets with
Enumerablesupport - Prepared statements (
prepare/2,Statement.execute/1,Statement.execute_update/1) - Metadata discovery:
get_tables/2,get_db_schemas/2,get_sql_info/1 - TLS connections — plaintext, OS trust store, or custom CA certificate
- Bearer-token and arbitrary gRPC header injection
- Explorer integration:
Result.to_dataframe/1,ExArrow.Explorer.from_stream/1 - Nx integration:
Result.to_tensor/2 - Mox-compatible
ClientBehaviourfor unit testing without a server
Not supported in v0.5.0 (deferred):
- Bulk ingestion (
DoPut) - Transactions (
BEGIN,COMMIT,ROLLBACK) - Multi-endpoint distributed
FlightInforesponses - Parameter binding for prepared statements (v0.6.0)
- Filtering
get_sql_infoby specific info code (returns all codes)
Integration tests
Integration tests require a running Flight SQL server and are excluded from
mix test by default. The suite lives in
test/ex_arrow/flight_sql_integration_test.exs and is tagged
:flight_sql_integration:
mix test test/ex_arrow/flight_sql_integration_test.exs --include flight_sql_integration
DuckDB can be started as a Flight SQL server using its
flight_sql extension.