Documentation for Adbc.Connection.
Connection are modelled as processes. They require
an Adbc.Database to be started.
Summary
Functions
Performs a bulk insert operation.
Same as bulk_insert/3 but raises an exception on error.
Returns a specification to start this module under a supervisor.
Runs the given query with params and statement_options, returning
only the number of affected rows.
Same as execute/4 but raises an exception on error.
Get a binary (bytes) type option of the connection.
Gets the underlying driver of a connection process.
Get a float type option of the connection.
Get metadata about the database/driver.
Get an integer type option of the connection.
Get a hierarchical view of all catalogs, database schemas, tables, and columns.
Get a string type option of the connection.
Get a list of table types in the database.
Ingests columns into a temporary table that is automatically dropped when the returned result is garbage collected.
Same as ingest/2 but raises an exception on error.
Prepares the given query.
Runs the given query with params and statement_options.
Runs the given query with params and statement_options.
Same as query/4 but raises an exception on error.
Runs the given query with params and
pass the Adbc.StreamResult to the given function.
Set option for the connection.
Starts a connection process.
Types
@type t() :: GenServer.server()
Functions
@spec bulk_insert( t(), [Adbc.Column.t()] | keyword(list() | Adbc.Column.t()) | Adbc.StreamResult.t() | Pythonx.Object.t(), Keyword.t() ) :: {:ok, non_neg_integer()} | {:error, Exception.t()}
Performs a bulk insert operation.
This function creates a table (or appends to an existing one) and inserts columns in supported databases. This should be more efficient than using SQL query in supported databases.
Columns can be given as:
a keyword list of column name to data, where values are either plain lists (converted via
Adbc.Column.new/2) orAdbc.Column.t()structs (the name from the keyword key overrides the column's name)a list of
Adbc.Column.t()structsan
Adbc.StreamResult.t()(obtained fromquery_pointer/4) to efficiently insert query results without materializing the dataa
Pythonx.Objectrepresenting a PyArrowRecordBatchReader(requires thepythonxpackage)
Arguments
conn- The connection processcolumns_or_stream- Columns as a keyword list, a list ofAdbc.Column.t(), anAdbc.StreamResult.t(), or aPythonx.Objectopts- Options for the bulk insert operation
Options
:table(required) - The name of the target table for bulk insert:mode(optional) - The ingestion mode. When not specified, the default behavior is driver-dependent but typically behaves like:create. Available modes::create- Create the table and insert data; error if the table already exists:append- Insert data into existing table; error if the table does not exist or if the schema does not match:replace- Drop the table if it exists, create it, and insert data:create_append- Create the table if it does not exist, otherwise append; error if the table exists but the schema does not match
:catalog(optional) - The catalog of the table. Support is driver-dependent. Not supported with:temporary.:schema(optional) - The database schema of the table. Support is driver-dependent. For example, SQLite does not support this option. Not supported with:temporary.:temporary(optional) - Iftrue, create a temporary table. Default isfalse. Cannot be used with:catalogor:schema.
Examples
# Using a keyword list (types are inferred, use Adbc.Column for explicit types)
Adbc.Connection.bulk_insert(conn,
[id: [1, 2, 3], name: Adbc.Column.string(["Alice", "Bob", "Charlie"])],
table: "users"
)
#=> {:ok, 3}
# Using a list of columns
columns = [
Adbc.Column.s64([1, 2, 3], name: "id"),
Adbc.Column.string(["Alice", "Bob", "Charlie"], name: "name")
]
Adbc.Connection.bulk_insert(conn, columns, table: "users")
#=> {:ok, 3}
# Append to an existing table
Adbc.Connection.bulk_insert(conn, columns, table: "users", mode: :append)
#=> {:ok, 3}
# Create a temporary table
Adbc.Connection.bulk_insert(conn, columns, table: "temp_users", temporary: true)
#=> {:ok, 3}
# Replace an existing table
Adbc.Connection.bulk_insert(conn, columns, table: "users", mode: :replace)
#=> {:ok, 3}
# Efficiently insert from a query (within query_pointer callback)
# This is most useful for transferring across databases.
# Within the same database, you most likely have custom SQL commands,
# such as COPY, CREATE TEMPORARY TABLE, etc.
Adbc.Connection.query_pointer(source_conn, "SELECT * FROM source_table", fn stream ->
Adbc.Connection.bulk_insert(dest_conn, stream, table: "dest_table")
end)
@spec bulk_insert!( t(), [Adbc.Column.t()] | keyword(list() | Adbc.Column.t()) | Adbc.StreamResult.t() | Pythonx.Object.t(), Keyword.t() ) :: non_neg_integer()
Same as bulk_insert/3 but raises an exception on error.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec execute(t(), binary() | reference(), [term()], Keyword.t()) :: {:ok, non_neg_integer() | nil} | {:error, Exception.t()}
Runs the given query with params and statement_options, returning
only the number of affected rows.
Unlike query/4, this function does not return any data. This is useful
for DDL statements (CREATE TABLE, DROP TABLE, etc.) and DML statements
(INSERT, UPDATE, DELETE) where you don't need the result set.
Returns {:ok, rows_affected} where rows_affected is a non-negative
integer or nil if the driver does not report it.
Same as execute/4 but raises an exception on error.
Get a binary (bytes) type option of the connection.
Gets the underlying driver of a connection process.
Examples
ADBC.Connection.get_driver(conn)
#=> {:ok, :sqlite}
Get a float type option of the connection.
@spec get_info(t(), [non_neg_integer()]) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Get metadata about the database/driver.
The result is an Arrow dataset with the following schema:
| Field Name | Field Type | Null Constraint |
|---|---|---|
info_name | uint32 | not null |
info_value | INFO_SCHEMA |
INFO_SCHEMA is a dense union with members:
| Field Name | Type Code | Field Type |
|---|---|---|
string_value | 0 | utf8 |
bool_value | 1 | bool |
int64_value | 2 | int64 |
int32_bitmask | 3 | int32 |
string_list | 4 | list<utf8> |
int32_to_int32_list_map | 5 | map<int32, list<int32>> |
Each metadatum is identified by an integer code. The recognized codes are defined as constants. Codes [0, 10_000) are reserved for ADBC usage. Drivers/vendors will ignore requests for unrecognized codes (the row will be omitted from the result).
Get an integer type option of the connection.
@spec get_objects( t(), non_neg_integer(), catalog: String.t(), db_schema: String.t(), table_name: String.t(), table_type: [String.t()], column_name: String.t() ) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Get a hierarchical view of all catalogs, database schemas, tables, and columns.
The result is an Arrow dataset with the following schema:
| Field Name | Field Type |
|---|---|
catalog_name | utf8 |
catalog_db_schemas | list<DB_SCHEMA_SCHEMA> |
DB_SCHEMA_SCHEMA is a Struct with fields:
| Field Name | Field Type |
|---|---|
db_schema_name | utf8 |
db_schema_tables | list<TABLE_SCHEMA> |
TABLE_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint |
|---|---|---|
table_name | utf8 | not null |
table_type | utf8 | not null |
table_columns | list<COLUMN_SCHEMA> | |
table_constraints | list<CONSTRAINT_SCHEMA> |
COLUMN_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint | Comments |
|---|---|---|---|
column_name | utf8 | not null | |
ordinal_position | int32 | (1) | |
remarks | utf8 | (2) | |
xdbc_data_type | int16 | (3) | |
xdbc_type_name | utf8 | (3) | |
xdbc_column_size | int32 | (3) | |
xdbc_decimal_digits | int16 | (3) | |
xdbc_num_prec_radix | int16 | (3) | |
xdbc_nullable | int16 | (3) | |
xdbc_column_def | utf8 | (3) | |
xdbc_sql_data_type | int16 | (3) | |
xdbc_datetime_sub | int16 | (3) | |
xdbc_char_octet_length | int32 | (3) | |
xdbc_is_nullable | utf8 | (3) | |
xdbc_scope_catalog | utf8 | (3) | |
xdbc_scope_schema | utf8 | (3) | |
xdbc_scope_table | utf8 | (3) | |
xdbc_is_autoincrement | bool | (3) | |
xdbc_is_generatedcolumn | bool | (3) |
- The column's ordinal position in the table (starting from 1).
- Database-specific description of the column.
- Optional value. Should be null if not supported by the driver.
xdbc_values are meant to provide JDBC/ODBC-compatible metadata in an agnostic manner.
CONSTRAINT_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint | Comments |
|---|---|---|---|
constraint_name | utf8 | ||
constraint_type | utf8 | not null | (1) |
constraint_column_names | list<utf8> | not null | (2) |
constraint_column_usage | list<USAGE_SCHEMA> | (3) |
- One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
- The columns on the current table that are constrained, in order.
- For FOREIGN KEY only, the referenced table and columns.
USAGE_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint |
|---|---|---|
fk_catalog | utf8 | |
fk_db_schema | utf8 | |
fk_table | utf8 | not null |
fk_column_name | utf8 | not null |
Get a string type option of the connection.
@spec get_table_types(t()) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Get a list of table types in the database.
The result is an Arrow dataset with the following schema:
| Field Name | Field Type | Null Contstraint |
|---|---|---|
table_type | utf8 | not null |
@spec ingest( t(), [Adbc.Column.t()] | keyword(list() | Adbc.Column.t()) | Adbc.StreamResult.t() | Pythonx.Object.t() ) :: {:ok, Adbc.IngestResult.t()} | {:error, Exception.t()}
Ingests columns into a temporary table that is automatically dropped when the returned result is garbage collected.
Columns can be given as:
a keyword list of column name to data, where values are either plain lists (converted via
Adbc.Column.new/2) orAdbc.Column.t()structs (the name from the keyword key overrides the column's name)a list of
Adbc.Column.t()structsan
Adbc.StreamResult.t()(obtained fromquery_pointer/4) to efficiently insert query results without materializing the dataa
Pythonx.Objectrepresenting a PyArrowRecordBatchReader(requires thepythonxpackage)
Returns {:ok, %Adbc.IngestResult{}} on success.
Garbage collection
You must always hold a whole reference to the struct,
and not individual fields. For example, if you only
keep a reference to result.table, then the struct will
be GCed, and so would be the table.
Examples
# Using a keyword list
{:ok, result} = Adbc.Connection.ingest(conn,
id: [1, 2, 3],
name: Adbc.Column.string(["Alice", "Bob", "Charlie"])
)
result.table
#=> "adbc_ingest_0"
result.num_rows
#=> 3
# Using a list of columns
columns = [
Adbc.Column.s64([1, 2, 3], name: "id"),
Adbc.Column.string(["Alice", "Bob", "Charlie"], name: "name")
]
{:ok, result} = Adbc.Connection.ingest(conn, columns)
@spec ingest!( t(), [Adbc.Column.t()] | keyword(list() | Adbc.Column.t()) | Adbc.StreamResult.t() | Pythonx.Object.t() ) :: Adbc.IngestResult.t()
Same as ingest/2 but raises an exception on error.
@spec prepare(t(), binary()) :: {:ok, reference()} | {:error, Exception.t()}
Prepares the given query.
Runs the given query with params and statement_options.
This function requires pythonx to be installed, with the pyarrow
package available in the installation.
The return value is an ok-tuple with Pythonx.Object - an instance
of pyarrow.Table.
The table object can then be used to efficiently create a polars dataframe:
{:ok, py_table} = Adbc.Connection.py_query(conn, "SELECT * FROM ...", [])
Pythonx.eval(
"""
import polars
df = polars.from_arrow(py_table)
# ...
""",
%{"py_table" => py_table}
)or a pandas dataframe:
{:ok, py_table} = Adbc.Connection.py_query(conn, "SELECT * FROM ...", [])
Pythonx.eval(
"""
df = py_table.to_pandas()
# ...
""",
%{"py_table" => py_table}
)
@spec query(t(), binary() | reference(), [term()], Keyword.t()) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Runs the given query with params and statement_options.
It returns an ok-tuple with Adbc.Result or an error-tuple.
You often want to call Adbc.Result.to_map/1 on the result
to consume it.
Same as query/4 but raises an exception on error.
It returns an Adbc.Result struct. You often want to call
Adbc.Result.to_map/1 on the result to consume it.
Runs the given query with params and
pass the Adbc.StreamResult to the given function.
The Adbc.StreamResult holds a pointer to a valid ArrowStream through
the duration of the function. A Adbc.StreamResult can only be consumed once.
The callback function should accept a single argument of type
Adbc.StreamResult.t(). For backwards compatibility, 2-arity
functions are still supported but deprecated (a warning will be emitted).
@spec set_option( pid(), atom() | String.t(), atom() | {:binary, iodata()} | String.t() | number() ) :: :ok | {:error, String.t()}
Set option for the connection.
- If
valueis an atom or a string, then corresponding string option will be set. - If
valueis a{:binary, iodata()}-tuple, then corresponding binary option will be set. - If
valueis an integer, then corresponding integer option will be set. - If
valueis a float, then corresponding float option will be set.
Starts a connection process.
Options
:database(required) - the database process to connect to:process_options- the options to be given to the underlying process. SeeGenServer.start_link/3for all options
Examples
Adbc.Connection.start_link(
database: MyApp.DB,
process_options: [name: MyApp.Conn]
)In your supervision tree it would be started like this:
children = [
{Adbc.Connection,
database: MyApp.DB,
process_options: [name: MyApp.Conn]}
]