InfluxElixir.Write.BatchWriter (InfluxElixir v0.1.15)

Copy Markdown View Source

GenServer-based batch writer with configurable flush intervals, batch sizes, retry with exponential backoff, and backpressure.

Points or pre-encoded line protocol strings are buffered in memory and flushed either when the buffer reaches batch_size or when the flush_interval_ms timer fires — whichever comes first.

Options

  • :connection - connection term passed to InfluxElixir.Write.Writer
  • :database - default database name (binary)
  • :batch_size - maximum points per flush (default: 5000)
  • :flush_interval_ms - timer interval in milliseconds (default: 1000)
  • :jitter_ms - random jitter added to flush timer (default: 0)
  • :max_retries - max retry attempts for 5xx errors (default: 3)
  • :no_sync - when true, write_sync/2 behaves like write/2 (default: false)

Backpressure

When the buffer exceeds 10 * batch_size entries, new writes are rejected with {:error, :buffer_full}.

Retry Policy

Only 5xx and network errors are retried using asynchronous exponential backoff with optional jitter. 4xx errors are discarded and logged. Retries are non-blocking — the GenServer continues to accept messages between retry attempts.

Stats

Call stats/1 to retrieve a map with :total_writes, :total_errors, and :total_bytes counters.

Summary

Functions

Returns a specification to start this module under a supervisor.

Forces an immediate flush of the buffer.

Starts a BatchWriter GenServer linked to the calling process.

Returns the current stats map.

Buffers a point (or line protocol binary) for writing.

Synchronously writes a point and waits for the next flush to complete.

Types

stat_key()

@type stat_key() :: :total_writes | :total_errors | :total_bytes

stats()

@type stats() :: %{required(stat_key()) => non_neg_integer()}

t()

@type t() :: %InfluxElixir.Write.BatchWriter{
  batch_size: pos_integer(),
  buffer: [binary()],
  buffer_size: non_neg_integer(),
  connection: term(),
  database: binary() | nil,
  flush_interval_ms: pos_integer(),
  jitter_ms: non_neg_integer(),
  max_retries: non_neg_integer(),
  no_sync: boolean(),
  pending_sync: {pid(), term()} | nil,
  retry_attempt: non_neg_integer(),
  retry_payload: binary() | nil,
  stats: stats(),
  timer_ref: reference() | nil
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

flush(server)

@spec flush(GenServer.server()) :: :ok

Forces an immediate flush of the buffer.

Examples

iex> InfluxElixir.Write.BatchWriter.flush(pid)
:ok

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a BatchWriter GenServer linked to the calling process.

Options

See module documentation for available options.

Examples

iex> {:ok, pid} = InfluxElixir.Write.BatchWriter.start_link(
...>   connection: conn,
...>   database: "mydb"
...> )
iex> is_pid(pid)
true

stats(server)

@spec stats(GenServer.server()) :: {:ok, stats()}

Returns the current stats map.

Keys

  • :total_writes - total successful write operations
  • :total_errors - total failed write operations
  • :total_bytes - total bytes flushed

Examples

iex> {:ok, stats} = InfluxElixir.Write.BatchWriter.stats(pid)
iex> Map.keys(stats)
[:total_bytes, :total_errors, :total_writes]

write(server, payload)

@spec write(GenServer.server(), InfluxElixir.Write.Point.t() | binary()) ::
  :ok | {:error, :buffer_full}

Buffers a point (or line protocol binary) for writing.

Blocks until the buffer accepts the point, then returns. Does not wait for the data to be flushed to InfluxDB. Returns {:error, :buffer_full} when the buffer exceeds the backpressure threshold.

Parameters

  • server - PID or registered name of the BatchWriter
  • payload - a InfluxElixir.Write.Point.t() or pre-encoded binary

Examples

iex> InfluxElixir.Write.BatchWriter.write(pid, "cpu value=1.0")
:ok

write_sync(server, payload)

@spec write_sync(GenServer.server(), InfluxElixir.Write.Point.t() | binary()) ::
  :ok | {:error, term()}

Synchronously writes a point and waits for the next flush to complete.

Blocks until the buffered data has been flushed and the write is confirmed. When no_sync: true is configured, behaves identically to write/2.

Parameters

  • server - PID or registered name of the BatchWriter
  • payload - a InfluxElixir.Write.Point.t() or pre-encoded binary

Examples

iex> InfluxElixir.Write.BatchWriter.write_sync(pid, "cpu value=1.0")
:ok