InfluxElixir.Write.BatchWriter (InfluxElixir v0.1.16)

Copy Markdown View Source

GenServer-based batch writer with configurable flush intervals, batch sizes, retry with exponential backoff, and backpressure.

Points or pre-encoded line protocol strings are buffered in memory and flushed either when the buffer reaches batch_size or when the flush_interval_ms timer fires — whichever comes first.

Options

  • :connection - connection term passed to InfluxElixir.Write.Writer
  • :database - default database name (binary)
  • :batch_size - maximum points per flush (default: 5000)
  • :flush_interval_ms - timer interval in milliseconds (default: 1000)
  • :jitter_ms - random jitter added to flush timer (default: 0)
  • :max_retries - max retry attempts for 5xx errors (default: 3)
  • :base_retry_delay_ms - base for exponential retry backoff. Delay for attempt N is roughly base * 2^N. Default: 100.
  • :no_sync - when true, write_sync/3 behaves like write/3 (default: false)
  • :write_opts - keyword list forwarded to InfluxElixir.Write.Writer.write/3 on every flush. Useful for setting :database, :timeout, :precision per BatchWriter without baking them into the connection. Default: [].

Backpressure

When the buffer exceeds 10 * batch_size entries, new writes are rejected with {:error, :buffer_full}.

Retry Policy

Only 5xx and network errors are retried using asynchronous exponential backoff with optional jitter. 4xx errors are discarded and logged. Retries are non-blocking — the GenServer continues to accept messages between retry attempts.

Stats

Call stats/1 to retrieve a map with :total_writes, :total_errors, and :total_bytes counters.

Summary

Functions

Returns a specification to start this module under a supervisor.

Forces an immediate flush of the buffer.

Starts a BatchWriter GenServer linked to the calling process.

Returns the current stats map.

Buffers a point (or line protocol binary) for writing.

Synchronously writes a point and waits for the next flush to complete.

Types

stat_key()

@type stat_key() :: :total_writes | :total_errors | :total_bytes

stats()

@type stats() :: %{required(stat_key()) => non_neg_integer()}

t()

@type t() :: %InfluxElixir.Write.BatchWriter{
  base_retry_delay_ms: non_neg_integer(),
  batch_size: pos_integer(),
  buffer: [binary()],
  buffer_size: non_neg_integer(),
  connection: term(),
  database: binary() | nil,
  flush_interval_ms: pos_integer(),
  jitter_ms: non_neg_integer(),
  max_retries: non_neg_integer(),
  no_sync: boolean(),
  pending_sync: {pid(), term()} | nil,
  retry_attempt: non_neg_integer(),
  retry_payload: binary() | nil,
  stats: stats(),
  timer_ref: reference() | nil,
  write_opts: keyword()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

flush(server, timeout \\ 60000)

@spec flush(GenServer.server(), timeout()) :: :ok

Forces an immediate flush of the buffer.

Bounded by timeout (default: 60000 ms). Returns :ok once the underlying HTTP write completes (or schedules a retry). Retries scheduled by do_flush are asynchronous and do NOT extend the caller's wait.

Parameters

  • server - PID or registered name of the BatchWriter
  • timeout - GenServer.call wait bound in ms (default: 60_000)

Examples

iex> InfluxElixir.Write.BatchWriter.flush(pid)
:ok

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a BatchWriter GenServer linked to the calling process.

Options

See module documentation for available options.

Examples

iex> {:ok, pid} = InfluxElixir.Write.BatchWriter.start_link(
...>   connection: conn,
...>   database: "mydb"
...> )
iex> is_pid(pid)
true

stats(server)

@spec stats(GenServer.server()) :: {:ok, stats()}

Returns the current stats map.

Keys

  • :total_writes - total successful write operations
  • :total_errors - total failed write operations
  • :total_bytes - total bytes flushed

Examples

iex> {:ok, stats} = InfluxElixir.Write.BatchWriter.stats(pid)
iex> Map.keys(stats)
[:total_bytes, :total_errors, :total_writes]

write(server, payload, timeout \\ 60000)

@spec write(
  GenServer.server(),
  InfluxElixir.Write.Point.t() | binary(),
  timeout()
) :: :ok | {:error, :buffer_full}

Buffers a point (or line protocol binary) for writing.

Returns immediately after buffering unless the buffer reaches batch_size, in which case handle_call triggers a synchronous do_flush that calls the HTTP client. The timeout argument bounds the GenServer.call/3 wait — defaults to 60000 ms, generous enough to cover a single HTTP write at Client.HTTP's 30s default.

Parameters

  • server - PID or registered name of the BatchWriter
  • payload - a InfluxElixir.Write.Point.t() or pre-encoded binary
  • timeout - GenServer.call wait bound in ms (default: 60_000)

Examples

iex> InfluxElixir.Write.BatchWriter.write(pid, "cpu value=1.0")
:ok

write_sync(server, payload, timeout \\ 300_000)

@spec write_sync(
  GenServer.server(),
  InfluxElixir.Write.Point.t() | binary(),
  timeout()
) :: :ok | {:error, term()}

Synchronously writes a point and waits for the next flush to complete.

Blocks until the buffered data has been flushed and the write is confirmed. When no_sync: true is configured, behaves identically to write/3.

The caller waits through the full retry chain. The default timeout of 300000 ms covers up to max_retries + 1 HTTP writes at the default 30s HTTP timeout plus exponential backoff. Override for endpoints with longer expected tail latencies.

Parameters

  • server - PID or registered name of the BatchWriter
  • payload - a InfluxElixir.Write.Point.t() or pre-encoded binary
  • timeout - GenServer.call wait bound in ms (default: 300_000). Pass :infinity for unbounded blocking.

Examples

iex> InfluxElixir.Write.BatchWriter.write_sync(pid, "cpu value=1.0")
:ok