FerricStore.Pipe (ferricstore v0.3.7)

Copy Markdown View Source

Pipeline accumulator for batching multiple FerricStore commands.

Used with FerricStore.pipeline/1 to batch multiple operations into a single Raft entry per shard. Commands are accumulated in reverse order and on execute, converted to RESP tuples and dispatched through the Coordinator. Single-shard pipelines commit in one Raft round-trip; cross-shard pipelines use the anchor-shard mechanism.

Results are normalized to match the FerricStore public API format (e.g. {:ok, value} for GET, :ok for DEL) rather than raw Dispatcher values.

Usage

FerricStore.pipeline(fn pipe ->
  pipe
  |> FerricStore.Pipe.set("key1", "val1")
  |> FerricStore.Pipe.set("key2", "val2")
  |> FerricStore.Pipe.incr("counter")
end)

Summary

Functions

Adds a DEL command to the pipeline.

Executes all accumulated pipeline commands as a single batch Raft entry per shard via the Coordinator.

Adds an EXPIRE command to the pipeline.

Adds a GET command to the pipeline.

Adds an HGET command to the pipeline.

Adds an HSET command to the pipeline.

Adds an INCR command to the pipeline.

Adds an INCRBY command to the pipeline.

Adds an LPUSH command to the pipeline.

Creates a new empty pipeline.

Adds an RPUSH command to the pipeline.

Adds a SADD command to the pipeline.

Adds a SET command to the pipeline.

Adds a ZADD command to the pipeline.

Types

command()

@type command() ::
  {:set, binary(), binary(), keyword()}
  | {:get, binary()}
  | {:del, binary()}
  | {:incr, binary()}
  | {:incr_by, binary(), integer()}
  | {:hset, binary(), map()}
  | {:hget, binary(), binary()}
  | {:lpush, binary(), [binary()]}
  | {:rpush, binary(), [binary()]}
  | {:sadd, binary(), [binary()]}
  | {:zadd, binary(), [{number(), binary()}]}
  | {:expire, binary(), non_neg_integer()}

t()

@type t() :: %FerricStore.Pipe{commands: [command()]}

Functions

del(pipe, key)

@spec del(t(), binary()) :: t()

Adds a DEL command to the pipeline.

execute(pipe)

@spec execute(t()) :: [term()]

Executes all accumulated pipeline commands as a single batch Raft entry per shard via the Coordinator.

This is called internally by FerricStore.pipeline/1. Commands are converted to RESP-style tuples and dispatched through Ferricstore.Transaction.Coordinator, which groups them by shard and submits each group as a single {:batch} or {:tx_execute} Raft entry. Single-shard pipelines commit in one Raft round-trip; cross-shard pipelines use the anchor-shard mechanism.

Results are returned in the original command order.

expire(pipe, key, ttl_ms)

@spec expire(t(), binary(), non_neg_integer()) :: t()

Adds an EXPIRE command to the pipeline.

get(pipe, key)

@spec get(t(), binary()) :: t()

Adds a GET command to the pipeline.

hget(pipe, key, field)

@spec hget(t(), binary(), binary()) :: t()

Adds an HGET command to the pipeline.

hset(pipe, key, fields)

@spec hset(t(), binary(), map()) :: t()

Adds an HSET command to the pipeline.

incr(pipe, key)

@spec incr(t(), binary()) :: t()

Adds an INCR command to the pipeline.

incr_by(pipe, key, amount)

@spec incr_by(t(), binary(), integer()) :: t()

Adds an INCRBY command to the pipeline.

lpush(pipe, key, elements)

@spec lpush(t(), binary(), [binary()]) :: t()

Adds an LPUSH command to the pipeline.

new()

@spec new() :: t()

Creates a new empty pipeline.

rpush(pipe, key, elements)

@spec rpush(t(), binary(), [binary()]) :: t()

Adds an RPUSH command to the pipeline.

sadd(pipe, key, members)

@spec sadd(t(), binary(), [binary()]) :: t()

Adds a SADD command to the pipeline.

set(pipe, key, value, opts \\ [])

@spec set(t(), binary(), binary(), keyword()) :: t()

Adds a SET command to the pipeline.

zadd(pipe, key, score_member_pairs)

@spec zadd(t(), binary(), [{number(), binary()}]) :: t()

Adds a ZADD command to the pipeline.