GenServer-based batch writer with configurable flush intervals, batch sizes, retry with exponential backoff, and backpressure.
Points or pre-encoded line protocol strings are buffered in memory and
flushed either when the buffer reaches batch_size or when the
flush_interval_ms timer fires — whichever comes first.
Options
:connection- connection term passed toInfluxElixir.Write.Writer:database- default database name (binary):batch_size- maximum points per flush (default:5000):flush_interval_ms- timer interval in milliseconds (default:1000):jitter_ms- random jitter added to flush timer (default:0):max_retries- max retry attempts for 5xx errors (default:3):no_sync- whentrue,write_sync/2behaves likewrite/2(default:false)
Backpressure
When the buffer exceeds 10 * batch_size entries, new writes are rejected
with {:error, :buffer_full}.
Retry Policy
Only 5xx and network errors are retried using asynchronous exponential backoff with optional jitter. 4xx errors are discarded and logged. Retries are non-blocking — the GenServer continues to accept messages between retry attempts.
Stats
Call stats/1 to retrieve a map with :total_writes, :total_errors,
and :total_bytes counters.
Summary
Functions
Returns a specification to start this module under a supervisor.
Forces an immediate flush of the buffer.
Starts a BatchWriter GenServer linked to the calling process.
Returns the current stats map.
Buffers a point (or line protocol binary) for writing.
Synchronously writes a point and waits for the next flush to complete.
Types
@type stat_key() :: :total_writes | :total_errors | :total_bytes
@type stats() :: %{required(stat_key()) => non_neg_integer()}
@type t() :: %InfluxElixir.Write.BatchWriter{ batch_size: pos_integer(), buffer: [binary()], buffer_size: non_neg_integer(), connection: term(), database: binary() | nil, flush_interval_ms: pos_integer(), jitter_ms: non_neg_integer(), max_retries: non_neg_integer(), no_sync: boolean(), pending_sync: {pid(), term()} | nil, retry_attempt: non_neg_integer(), retry_payload: binary() | nil, stats: stats(), timer_ref: reference() | nil }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec flush(GenServer.server()) :: :ok
Forces an immediate flush of the buffer.
Examples
iex> InfluxElixir.Write.BatchWriter.flush(pid)
:ok
@spec start_link(keyword()) :: GenServer.on_start()
Starts a BatchWriter GenServer linked to the calling process.
Options
See module documentation for available options.
Examples
iex> {:ok, pid} = InfluxElixir.Write.BatchWriter.start_link(
...> connection: conn,
...> database: "mydb"
...> )
iex> is_pid(pid)
true
@spec stats(GenServer.server()) :: {:ok, stats()}
Returns the current stats map.
Keys
:total_writes- total successful write operations:total_errors- total failed write operations:total_bytes- total bytes flushed
Examples
iex> {:ok, stats} = InfluxElixir.Write.BatchWriter.stats(pid)
iex> Map.keys(stats)
[:total_bytes, :total_errors, :total_writes]
@spec write(GenServer.server(), InfluxElixir.Write.Point.t() | binary()) :: :ok | {:error, :buffer_full}
Buffers a point (or line protocol binary) for writing.
Blocks until the buffer accepts the point, then returns. Does not wait
for the data to be flushed to InfluxDB. Returns {:error, :buffer_full}
when the buffer exceeds the backpressure threshold.
Parameters
server- PID or registered name of the BatchWriterpayload- aInfluxElixir.Write.Point.t()or pre-encoded binary
Examples
iex> InfluxElixir.Write.BatchWriter.write(pid, "cpu value=1.0")
:ok
@spec write_sync(GenServer.server(), InfluxElixir.Write.Point.t() | binary()) :: :ok | {:error, term()}
Synchronously writes a point and waits for the next flush to complete.
Blocks until the buffered data has been flushed and the write is confirmed.
When no_sync: true is configured, behaves identically to write/2.
Parameters
server- PID or registered name of the BatchWriterpayload- aInfluxElixir.Write.Point.t()or pre-encoded binary
Examples
iex> InfluxElixir.Write.BatchWriter.write_sync(pid, "cpu value=1.0")
:ok