In-memory InfluxDB client for fast, isolated testing.
Stores data in ETS tables, enabling safe async: true tests with full
isolation between test instances. Each call to start/1 creates an
independent ETS table.
Parses real line protocol on write, stores points as maps, and responds with realistic InfluxDB response formats on query.
Profiles
LocalClient enforces an InfluxDB version profile that determines which operations are available. This prevents tests from accidentally using operations that the real InfluxDB backend doesn't support.
| Profile | Write | SQL | InfluxQL | Flux | DB CRUD | Bucket CRUD | Tokens |
|---|---|---|---|---|---|---|---|
:v3_core | yes | yes | yes | no | yes | no | no |
:v3_enterprise | yes | yes | yes | no | yes | no | yes |
:v2 | yes | no | no | yes | no | yes | no |
Operations outside the configured profile return
{:error, :unsupported_operation}.
Usage
# Match your production InfluxDB version
setup do
{:ok, conn} = InfluxElixir.Client.Local.start(
databases: ["test_db"],
profile: :v3_core
)
on_exit(fn -> InfluxElixir.Client.Local.stop(conn) end)
{:ok, conn: conn}
endChecking Profile Support
Use supports?/2 to check if an operation is available:
if Local.supports?(conn, :query_sql) do
Local.query_sql(conn, "SELECT * FROM cpu", database: "test_db")
endETS Key Layout
:databases=>MapSet.t(binary())— set of created database names:buckets=>MapSet.t(binary())— set of created bucket names:tokens=>[map()]— list of token maps{:points, database, measurement}=>[point_map()]— stored points
SQL Query Support
query_sql/3 understands a subset of SQL:
SELECT * FROM measurementSELECT col1, col2 [, ...] FROM measurementwith optionalAS alias(projects fields and tags;timeis selectable)WHERE tag = 'value'orWHERE field > N(supports AND)ORDER BY time ASC|DESCLIMIT N$paramplaceholders viaparams: %{"$name" => value}in optsDATE_BIN(INTERVAL 'N unit', time)time bucketing- Aggregate functions:
AVG,SUM,COUNT,MIN,MAX - Ordered aggregates:
first(field, time),last(field, time) GROUP BY DATE_BIN(INTERVAL 'N unit', time)— optional. When omitted, aggregate queries return a single scalar row (COUNTover an empty result set is0; other aggregates returnnil).- Interval units:
seconds,minutes,hours,days
Gzip Decompression
If a write payload begins with gzip magic bytes (0x1F 0x8B) it is automatically decompressed before line protocol parsing.
Timestamp Precision
Pass precision: :nanosecond | :microsecond | :millisecond | :second
in opts to normalise stored timestamps to nanoseconds.
Summary
Functions
Creates a named bucket in this local instance.
Creates a named database in this local instance.
Creates a synthetic API token and stores it in ETS.
Deletes a bucket from this local instance.
Deletes a database from this local instance.
Deletes a token by its id field. Returns :ok even if the token was
not found, matching real InfluxDB delete semantics.
Executes a SQL statement and returns a summary map.
Returns a passing health status map with string keys, matching the JSON-decoded shape returned by the HTTP client.
Returns all buckets in this local instance as a list of maps with a
single :name key.
Returns all databases created in this local instance as a list of maps
with a single :name key.
Executes a Flux query with support for common predicates.
Executes an InfluxQL query.
Executes a SQL-like query against stored ETS points and returns rows.
Executes a SQL query and returns results as a lazy Stream.
Starts a new LocalClient instance with isolated ETS storage.
Stops a LocalClient instance and cleans up its ETS table.
Returns true if the given operation is supported by the connection's profile.
Parses line protocol binary and stores the resulting points in ETS.
Types
@type conn() :: %{ table: :ets.table(), databases: MapSet.t(binary()), database: binary() | nil, profile: profile() }
@type parsed_query() :: %{ measurement: binary(), where: [{:eq | :gt | :lt | :gte | :lte | :ne, binary(), term()}], order_by: {:time, :asc | :desc} | nil, limit: pos_integer() | nil, group_by_interval: pos_integer() | nil, select_columns: [select_column()] | nil, distinct_column: binary() | nil, projection_columns: [{binary(), binary()}] | nil }
@type profile() :: :v3_core | :v3_enterprise | :v2
Functions
@spec create_bucket( InfluxElixir.Client.connection(), binary(), keyword() ) :: :ok | {:error, term()}
Creates a named bucket in this local instance.
Creating an already-existing bucket is idempotent.
@spec create_database( InfluxElixir.Client.connection(), binary(), keyword() ) :: :ok | {:error, term()}
Creates a named database in this local instance.
Always succeeds — creating an already-existing database is idempotent.
@spec create_token( InfluxElixir.Client.connection(), binary(), keyword() ) :: {:ok, map()} | {:error, term()}
Creates a synthetic API token and stores it in ETS.
Returns {:ok, %{id: id, token: token_string, description: desc}}.
@spec delete_bucket(InfluxElixir.Client.connection(), binary()) :: :ok | {:error, term()}
Deletes a bucket from this local instance.
Returns :ok whether or not the bucket exists, matching the idempotent
delete semantics of the v2 API.
@spec delete_database(InfluxElixir.Client.connection(), binary()) :: :ok | {:error, term()}
Deletes a database from this local instance.
Returns {:error, %{status: 404, body: "database not found: name"}} if
the database does not exist.
@spec delete_token(InfluxElixir.Client.connection(), binary()) :: :ok | {:error, term()}
Deletes a token by its id field. Returns :ok even if the token was
not found, matching real InfluxDB delete semantics.
@spec execute_sql(InfluxElixir.Client.connection(), binary(), keyword()) :: {:ok, map()} | {:error, term()}
Executes a SQL statement and returns a summary map.
Supports DELETE FROM <measurement> and
DELETE FROM <measurement> WHERE ... — matching points are removed
from ETS and the count is returned in %{"rows_affected" => N}.
On :v3_core profile, DELETE is not supported (matches real InfluxDB v3
Core behavior) and returns {:error, :delete_not_supported}.
On :v3_enterprise profile, DELETE is supported.
Unknown statements return %{"rows_affected" => 0}.
@spec health(InfluxElixir.Client.connection()) :: {:ok, map()} | {:error, term()}
Returns a passing health status map with string keys, matching the JSON-decoded shape returned by the HTTP client.
@spec list_buckets(InfluxElixir.Client.connection()) :: {:ok, [map()]} | {:error, term()}
Returns all buckets in this local instance as a list of maps with a
single :name key.
@spec list_databases(InfluxElixir.Client.connection()) :: {:ok, [map()]} | {:error, term()}
Returns all databases created in this local instance as a list of maps
with a single :name key.
@spec query_flux(InfluxElixir.Client.connection(), binary(), keyword()) :: InfluxElixir.Client.query_result()
Executes a Flux query with support for common predicates.
Parses and applies:
from(bucket: "...")— scopes to a databaserange(start: -1h)— filters by timestamp (supports-Nh,-Nd,-Nm)filter(fn: (r) => r._measurement == "...")— filters by measurementfilter(fn: (r) => r.<key> == "...")— filters by any tag/field equality
@spec query_influxql( InfluxElixir.Client.connection(), binary(), keyword() ) :: InfluxElixir.Client.query_result()
Executes an InfluxQL query.
Supports InfluxQL-specific commands:
SHOW DATABASES— returns all databasesSHOW MEASUREMENTS— returns all measurement namesSHOW TAG KEYS FROM <measurement>— returns distinct tag keysSELECT ...— delegates to the SQL engine
@spec query_sql(InfluxElixir.Client.connection(), binary(), keyword()) :: InfluxElixir.Client.query_result()
Executes a SQL-like query against stored ETS points and returns rows.
Supports:
SELECT * FROM measurementSELECT DISTINCT column FROM measurementWHERE key = 'value'/WHERE key > N/WHERE key < NORDER BY time ASC|DESCLIMIT N$paramplaceholder substitution viaparams: %{"$name" => value}
@spec query_sql_stream( InfluxElixir.Client.connection(), binary(), keyword() ) :: Enumerable.t()
Executes a SQL query and returns results as a lazy Stream.
Delegates to query_sql/3 then wraps the list in a stream.
Starts a new LocalClient instance with isolated ETS storage.
Options
:database- connection-level default database name. Used when the caller does not passdatabase:in opts. Pre-created automatically.:databases- list of database names to pre-create (default:[]):profile- InfluxDB version profile to emulate. Determines which operations are available. Operations outside the profile return{:error, :unsupported_operation}. Valid values::v3_core(default) — write, SQL, InfluxQL, database CRUD:v3_enterprise— everything in v3_core plus token management:v2— write, Flux, bucket CRUD
Examples
iex> {:ok, conn} = InfluxElixir.Client.Local.start(databases: ["mydb"])
iex> conn.profile
:v3_core
iex> {:ok, conn} = InfluxElixir.Client.Local.start(profile: :v2)
iex> conn.profile
:v2
iex> {:ok, conn} = InfluxElixir.Client.Local.start(database: "metrics")
iex> conn.database
"metrics"
@spec stop(conn()) :: :ok
Stops a LocalClient instance and cleans up its ETS table.
Safe to call multiple times; a no-op if the table is already deleted.
Returns true if the given operation is supported by the connection's profile.
@spec write(InfluxElixir.Client.connection(), binary(), keyword()) :: InfluxElixir.Client.write_result()
Parses line protocol binary and stores the resulting points in ETS.
The database is read from opts[:database]. If the database does not
exist an {:error, %{status: 404, body: ...}} is returned. If line protocol
cannot be parsed an {:error, %{status: 400, body: ...}} is returned.
Payloads beginning with gzip magic bytes are automatically decompressed.
Pass precision: :nanosecond | :microsecond | :millisecond | :second to
control how numeric timestamps are interpreted (default: :nanosecond).