GenServer-based batch writer with configurable flush intervals, batch sizes, retry with exponential backoff, and backpressure.
Points or pre-encoded line protocol strings are buffered in memory and
flushed either when the buffer reaches batch_size or when the
flush_interval_ms timer fires — whichever comes first.
Options
:connection- connection term passed toInfluxElixir.Write.Writer:database- default database name (binary):batch_size- maximum points per flush (default:5000):flush_interval_ms- timer interval in milliseconds (default:1000):jitter_ms- random jitter added to flush timer (default:0):max_retries- max retry attempts for 5xx errors (default:3):base_retry_delay_ms- base for exponential retry backoff. Delay for attempt N is roughlybase * 2^N. Default:100.:no_sync- whentrue,write_sync/3behaves likewrite/3(default:false):write_opts- keyword list forwarded toInfluxElixir.Write.Writer.write/3on every flush. Useful for setting:database,:timeout,:precisionper BatchWriter without baking them into the connection. Default:[].
Backpressure
When the buffer exceeds 10 * batch_size entries, new writes are rejected
with {:error, :buffer_full}.
Retry Policy
Only 5xx and network errors are retried using asynchronous exponential backoff with optional jitter. 4xx errors are discarded and logged. Retries are non-blocking — the GenServer continues to accept messages between retry attempts.
Stats
Call stats/1 to retrieve a map with :total_writes, :total_errors,
and :total_bytes counters.
Summary
Functions
Returns a specification to start this module under a supervisor.
Forces an immediate flush of the buffer.
Starts a BatchWriter GenServer linked to the calling process.
Returns the current stats map.
Buffers a point (or line protocol binary) for writing.
Synchronously writes a point and waits for the next flush to complete.
Types
@type stat_key() :: :total_writes | :total_errors | :total_bytes
@type stats() :: %{required(stat_key()) => non_neg_integer()}
@type t() :: %InfluxElixir.Write.BatchWriter{ base_retry_delay_ms: non_neg_integer(), batch_size: pos_integer(), buffer: [binary()], buffer_size: non_neg_integer(), connection: term(), database: binary() | nil, flush_interval_ms: pos_integer(), jitter_ms: non_neg_integer(), max_retries: non_neg_integer(), no_sync: boolean(), pending_sync: {pid(), term()} | nil, retry_attempt: non_neg_integer(), retry_payload: binary() | nil, stats: stats(), timer_ref: reference() | nil, write_opts: keyword() }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec flush(GenServer.server(), timeout()) :: :ok
Forces an immediate flush of the buffer.
Bounded by timeout (default: 60000 ms). Returns
:ok once the underlying HTTP write completes (or schedules a retry).
Retries scheduled by do_flush are asynchronous and do NOT extend the
caller's wait.
Parameters
server- PID or registered name of the BatchWritertimeout-GenServer.callwait bound in ms (default:60_000)
Examples
iex> InfluxElixir.Write.BatchWriter.flush(pid)
:ok
@spec start_link(keyword()) :: GenServer.on_start()
Starts a BatchWriter GenServer linked to the calling process.
Options
See module documentation for available options.
Examples
iex> {:ok, pid} = InfluxElixir.Write.BatchWriter.start_link(
...> connection: conn,
...> database: "mydb"
...> )
iex> is_pid(pid)
true
@spec stats(GenServer.server()) :: {:ok, stats()}
Returns the current stats map.
Keys
:total_writes- total successful write operations:total_errors- total failed write operations:total_bytes- total bytes flushed
Examples
iex> {:ok, stats} = InfluxElixir.Write.BatchWriter.stats(pid)
iex> Map.keys(stats)
[:total_bytes, :total_errors, :total_writes]
@spec write( GenServer.server(), InfluxElixir.Write.Point.t() | binary(), timeout() ) :: :ok | {:error, :buffer_full}
Buffers a point (or line protocol binary) for writing.
Returns immediately after buffering unless the buffer reaches batch_size,
in which case handle_call triggers a synchronous do_flush that calls
the HTTP client. The timeout argument bounds the GenServer.call/3
wait — defaults to 60000 ms, generous enough to
cover a single HTTP write at Client.HTTP's 30s default.
Parameters
server- PID or registered name of the BatchWriterpayload- aInfluxElixir.Write.Point.t()or pre-encoded binarytimeout-GenServer.callwait bound in ms (default:60_000)
Examples
iex> InfluxElixir.Write.BatchWriter.write(pid, "cpu value=1.0")
:ok
@spec write_sync( GenServer.server(), InfluxElixir.Write.Point.t() | binary(), timeout() ) :: :ok | {:error, term()}
Synchronously writes a point and waits for the next flush to complete.
Blocks until the buffered data has been flushed and the write is confirmed.
When no_sync: true is configured, behaves identically to write/3.
The caller waits through the full retry chain. The default timeout
of 300000 ms covers up to max_retries + 1
HTTP writes at the default 30s HTTP timeout plus exponential backoff.
Override for endpoints with longer expected tail latencies.
Parameters
server- PID or registered name of the BatchWriterpayload- aInfluxElixir.Write.Point.t()or pre-encoded binarytimeout-GenServer.callwait bound in ms (default:300_000). Pass:infinityfor unbounded blocking.
Examples
iex> InfluxElixir.Write.BatchWriter.write_sync(pid, "cpu value=1.0")
:ok