InfluxElixir.Client.Local (InfluxElixir v0.1.17)

Copy Markdown View Source

In-memory InfluxDB client for fast, isolated testing.

Stores data in ETS tables, enabling safe async: true tests with full isolation between test instances. Each call to start/1 creates an independent ETS table.

Parses real line protocol on write, stores points as maps, and responds with realistic InfluxDB response formats on query.

Profiles

LocalClient enforces an InfluxDB version profile that determines which operations are available. This prevents tests from accidentally using operations that the real InfluxDB backend doesn't support.

ProfileWriteSQLInfluxQLFluxDB CRUDBucket CRUDTokens
:v3_coreyesyesyesnoyesnono
:v3_enterpriseyesyesyesnoyesnoyes
:v2yesnonoyesnoyesno

Operations outside the configured profile return {:error, :unsupported_operation}.

Usage

# Match your production InfluxDB version
setup do
  {:ok, conn} = InfluxElixir.Client.Local.start(
    databases: ["test_db"],
    profile: :v3_core
  )
  on_exit(fn -> InfluxElixir.Client.Local.stop(conn) end)
  {:ok, conn: conn}
end

Checking Profile Support

Use supports?/2 to check if an operation is available:

if Local.supports?(conn, :query_sql) do
  Local.query_sql(conn, "SELECT * FROM cpu", database: "test_db")
end

ETS Key Layout

  • :databases => MapSet.t(binary()) — set of created database names
  • :buckets => MapSet.t(binary()) — set of created bucket names
  • :tokens => [map()] — list of token maps
  • {:points, database, measurement} => [point_map()] — stored points

SQL Query Support

query_sql/3 understands a subset of SQL:

  • SELECT * FROM measurement
  • SELECT col1, col2 [, ...] FROM measurement with optional AS alias (projects fields and tags; time is selectable)
  • WHERE tag = 'value' or WHERE field > N (supports AND)
  • WHERE col IN (v1, v2, ...) and WHERE col NOT IN (v1, v2, ...)
  • WHERE time <op> '<datetime>' accepts ISO-8601 datetimes ('2026-03-31T12:00:00Z'), bare ISO dates ('2026-03-31', interpreted as midnight UTC), and integer-as-string nanoseconds
  • ORDER BY time ASC|DESC
  • LIMIT N
  • $param placeholders via params: %{"$name" => value} in opts
  • DATE_BIN(INTERVAL 'N unit', time) time bucketing
  • Aggregate functions: AVG, SUM, COUNT, MIN, MAX
  • Ordered aggregates: first(field, time), last(field, time)
  • GROUP BY DATE_BIN(INTERVAL 'N unit', time) — optional. When omitted, aggregate queries return a single scalar row (COUNT over an empty result set is 0; other aggregates return nil).
  • GROUP BY <col>[, <col>...] — bucket points by tag/field values. Bare column names (with optional AS alias) are also valid in the SELECT list alongside aggregate functions.
  • Interval units: seconds, minutes, hours, days

SQL Param Types

params: values are serialised to SQL literals before query execution. Supported types: binary, integer, float, boolean, and Decimal (when the optional :decimal dependency is loaded — Decimal values are emitted as bare numeric literals via Decimal.to_string(:normal)).

Gzip Decompression

If a write payload begins with gzip magic bytes (0x1F 0x8B) it is automatically decompressed before line protocol parsing.

Timestamp Precision

Pass precision: :nanosecond | :microsecond | :millisecond | :second in opts to normalise stored timestamps to nanoseconds.

Summary

Functions

Creates a named bucket in this local instance.

Creates a named database in this local instance.

Creates a synthetic API token and stores it in ETS.

Deletes a bucket from this local instance.

Deletes a database from this local instance.

Deletes a token by its id field. Returns :ok even if the token was not found, matching real InfluxDB delete semantics.

Executes a SQL statement and returns a summary map.

Returns a passing health status map with string keys, matching the JSON-decoded shape returned by the HTTP client.

Returns all buckets in this local instance as a list of maps with a single :name key.

Returns all databases created in this local instance as a list of maps with a single :name key.

Executes a Flux query with support for common predicates.

Executes an InfluxQL query.

Executes a SQL-like query against stored ETS points and returns rows.

Executes a SQL query and returns results as a lazy Stream.

Starts a new LocalClient instance with isolated ETS storage.

Stops a LocalClient instance and cleans up its ETS table.

Returns true if the given operation is supported by the connection's profile.

Parses line protocol binary and stores the resulting points in ETS.

Types

conn()

@type conn() :: %{
  table: :ets.table(),
  databases: MapSet.t(binary()),
  database: binary() | nil,
  profile: profile()
}

parsed_query()

@type parsed_query() :: %{
  measurement: binary(),
  where: [where_clause()],
  order_by: {:time, :asc | :desc} | nil,
  limit: pos_integer() | nil,
  group_by_interval: pos_integer() | nil,
  group_by_columns: [binary()] | nil,
  select_columns: [select_column()] | nil,
  distinct_column: binary() | nil,
  projection_columns: [{binary(), binary()}] | nil
}

point_map()

@type point_map() :: %{
  measurement: binary(),
  tags: %{required(binary()) => binary()},
  fields: %{required(binary()) => term()},
  timestamp: integer() | nil
}

profile()

@type profile() :: :v3_core | :v3_enterprise | :v2

select_column()

@type select_column() ::
  {:time_bucket, binary()}
  | {:aggregate, :avg | :sum | :count | :min | :max, binary(), binary()}
  | {:count_star, binary()}
  | {:ordered_aggregate, :first | :last, binary(), binary(), binary()}
  | {:grouping_column, binary(), binary()}

where_clause()

@type where_clause() :: {where_op(), binary(), term()}

where_op()

@type where_op() :: :eq | :gt | :lt | :gte | :lte | :ne | :in | :not_in

Functions

create_bucket(conn, name, opts \\ [])

@spec create_bucket(
  InfluxElixir.Client.connection(),
  binary(),
  keyword()
) :: :ok | {:error, term()}

Creates a named bucket in this local instance.

Creating an already-existing bucket is idempotent.

create_database(conn, name, opts \\ [])

@spec create_database(
  InfluxElixir.Client.connection(),
  binary(),
  keyword()
) :: :ok | {:error, term()}

Creates a named database in this local instance.

Always succeeds — creating an already-existing database is idempotent.

create_token(conn, description, opts \\ [])

@spec create_token(
  InfluxElixir.Client.connection(),
  binary(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Creates a synthetic API token and stores it in ETS.

Returns {:ok, %{id: id, token: token_string, description: desc}}.

delete_bucket(conn, name)

@spec delete_bucket(InfluxElixir.Client.connection(), binary()) ::
  :ok | {:error, term()}

Deletes a bucket from this local instance.

Returns :ok whether or not the bucket exists, matching the idempotent delete semantics of the v2 API.

delete_database(conn, name)

@spec delete_database(InfluxElixir.Client.connection(), binary()) ::
  :ok | {:error, term()}

Deletes a database from this local instance.

Returns {:error, %{status: 404, body: "database not found: name"}} if the database does not exist.

delete_token(conn, token_id)

@spec delete_token(InfluxElixir.Client.connection(), binary()) ::
  :ok | {:error, term()}

Deletes a token by its id field. Returns :ok even if the token was not found, matching real InfluxDB delete semantics.

execute_sql(conn, sql, opts \\ [])

@spec execute_sql(InfluxElixir.Client.connection(), binary(), keyword()) ::
  {:ok, map()} | {:error, term()}

Executes a SQL statement and returns a summary map.

Supports DELETE FROM <measurement> and DELETE FROM <measurement> WHERE ... — matching points are removed from ETS and the count is returned in %{"rows_affected" => N}.

On :v3_core profile, DELETE is not supported (matches real InfluxDB v3 Core behavior) and returns {:error, :delete_not_supported}.

On :v3_enterprise profile, DELETE is supported.

Unknown statements return %{"rows_affected" => 0}.

health(conn)

@spec health(InfluxElixir.Client.connection()) :: {:ok, map()} | {:error, term()}

Returns a passing health status map with string keys, matching the JSON-decoded shape returned by the HTTP client.

list_buckets(conn)

@spec list_buckets(InfluxElixir.Client.connection()) ::
  {:ok, [map()]} | {:error, term()}

Returns all buckets in this local instance as a list of maps with a single :name key.

list_databases(conn)

@spec list_databases(InfluxElixir.Client.connection()) ::
  {:ok, [map()]} | {:error, term()}

Returns all databases created in this local instance as a list of maps with a single :name key.

query_flux(conn, flux, opts \\ [])

Executes a Flux query with support for common predicates.

Parses and applies:

  • from(bucket: "...") — scopes to a database
  • range(start: -1h) — filters by timestamp (supports -Nh, -Nd, -Nm)
  • filter(fn: (r) => r._measurement == "...") — filters by measurement
  • filter(fn: (r) => r.<key> == "...") — filters by any tag/field equality

query_influxql(conn, influxql, opts \\ [])

Executes an InfluxQL query.

Supports InfluxQL-specific commands:

  • SHOW DATABASES — returns all databases
  • SHOW MEASUREMENTS — returns all measurement names
  • SHOW TAG KEYS FROM <measurement> — returns distinct tag keys
  • SELECT ... — delegates to the SQL engine

query_sql(conn, sql, opts \\ [])

Executes a SQL-like query against stored ETS points and returns rows.

Supports:

  • SELECT * FROM measurement
  • SELECT DISTINCT column FROM measurement
  • WHERE key = 'value' / WHERE key > N / WHERE key < N
  • ORDER BY time ASC|DESC
  • LIMIT N
  • $param placeholder substitution via params: %{"$name" => value}

query_sql_stream(conn, sql, opts \\ [])

@spec query_sql_stream(
  InfluxElixir.Client.connection(),
  binary(),
  keyword()
) :: Enumerable.t()

Executes a SQL query and returns results as a lazy Stream.

Delegates to query_sql/3 then wraps the list in a stream.

start(opts \\ [])

@spec start(keyword()) :: {:ok, conn()}

Starts a new LocalClient instance with isolated ETS storage.

Options

  • :database - connection-level default database name. Used when the caller does not pass database: in opts. Pre-created automatically.
  • :databases - list of database names to pre-create (default: [])
  • :profile - InfluxDB version profile to emulate. Determines which operations are available. Operations outside the profile return {:error, :unsupported_operation}. Valid values:
    • :v3_core (default) — write, SQL, InfluxQL, database CRUD
    • :v3_enterprise — everything in v3_core plus token management
    • :v2 — write, Flux, bucket CRUD

Examples

iex> {:ok, conn} = InfluxElixir.Client.Local.start(databases: ["mydb"])
iex> conn.profile
:v3_core

iex> {:ok, conn} = InfluxElixir.Client.Local.start(profile: :v2)
iex> conn.profile
:v2

iex> {:ok, conn} = InfluxElixir.Client.Local.start(database: "metrics")
iex> conn.database
"metrics"

stop(map)

@spec stop(conn()) :: :ok

Stops a LocalClient instance and cleans up its ETS table.

Safe to call multiple times; a no-op if the table is already deleted.

supports?(map, operation)

@spec supports?(conn(), atom()) :: boolean()

Returns true if the given operation is supported by the connection's profile.

write(conn, payload, opts \\ [])

Parses line protocol binary and stores the resulting points in ETS.

The database is read from opts[:database]. If the database does not exist an {:error, %{status: 404, body: ...}} is returned. If line protocol cannot be parsed an {:error, %{status: 400, body: ...}} is returned.

Payloads beginning with gzip magic bytes are automatically decompressed. Pass precision: :nanosecond | :microsecond | :millisecond | :second to control how numeric timestamps are interpreted (default: :nanosecond).