FerricStore.Flow.Workflow (ferricstore v0.4.1)

Copy Markdown View Source

Elixir workflow SDK for FerricStore Flow.

This module is an ergonomic layer over the embedded flow_* API. It does not change Flow correctness semantics: every mutation is still an explicit Flow command, guarded by the same lease/fencing tokens.

Design

FerricStore.Flow.Workflow gives Elixir users a Temporal-like call-site without adding Temporal-style replay. A workflow module declares Flow type, partitioning, states, retry policy, lease defaults, and success/error actions. The generated functions call the embedded FerricStore API directly.

defmodule BillingFlow do
  use FerricStore.Flow.Workflow,
    type: "billing",
    partition_by: [:tenant_id, :invoice_id],
    initial_state: :created

  state :created do
    lease_ms 60_000
    claim_payload true, max_bytes: 64_000
    retry max_retries: 8,
          backoff: [kind: :exponential, base_ms: 1_000, max_ms: :timer.hours(1)]
    on_ok :charged
    on_error retry_or: :failed
  end

  state :charged do
    on_ok complete()
    on_error fail()
  end
end

This compiles to helpers such as BillingFlow.create/2, BillingFlow.claim_due/2, BillingFlow.ok/3, and BillingFlow.error/3.

Core rule

The SDK is not a hidden transaction engine. Reads and writes outside Flow are normal FerricStore calls. Flow state changes must still happen through explicit Flow commands.

The generated path is:

SDK call
-> embedded `FerricStore.flow_*` call
-> Ra/Bitcask Flow command
-> hot Flow indexes
-> async cold projections

No RESP client is used by this SDK.

Options

  • :type - required Flow type. All generated commands use this type.
  • :store - module that exposes embedded flow_* functions. Defaults to FerricStore. Use this to point at a use FerricStore instance module in embedded mode or a fake module in tests.
  • :partition_by - list of attr keys used to build partition_key. Values are joined with ":". Same partition keeps ordering on the same shard.
  • :initial_state - state used by create/2. Defaults to first declared state.

State DSL

state/2 declares defaults for claim and worker behavior:

  • lease_ms n - default lease used by claim_due/2.
  • claim_payload boolean, max_bytes: n - default payload hydration policy for claims.
  • retry opts - per-state retry override. Supports the same options as flow_retry/3: :max_retries, :backoff, and :exhausted_to.
  • on_ok state - ok/3 transitions to another state.
  • on_ok complete() - ok/3 completes the Flow.
  • on_error retry_or: state - error/3 retries until policy is exhausted, then moves to state.
  • on_error fail() - error/3 fails terminally.

Generated API

Each workflow module gets these functions:

  • create(attrs, opts \\ [])
  • create_many(items, opts \\ [])
  • child(attrs, opts \\ [])
  • spawn_children(parent, children, opts \\ [])
  • fanout(parent, children, opts \\ [])
  • claim_due(state \\ :any, opts)
  • run_once(state \\ :any, opts)
  • ok(job, value \\ nil, opts \\ [])
  • error(job, reason, opts \\ [])
  • handle(job, fun, opts \\ [])
  • transition(job, to_state, value \\ nil, opts \\ [])
  • complete(job, result \\ nil, opts \\ [])
  • retry(job, reason, opts \\ [])
  • fail(job, reason, opts \\ [])
  • extend_lease(job, opts \\ [])
  • get(id, opts \\ [])
  • history(id, opts \\ [])
  • list(state \\ :any, opts \\ [])
  • children(parent, opts \\ [])
  • waiting_children(parent, opts \\ [])
  • by_parent(parent_flow_id, opts \\ [])
  • by_root(root_flow_id, opts \\ [])
  • by_correlation(correlation_id, opts \\ [])
  • info(opts \\ [])
  • stuck(opts \\ [])
  • reclaim(state \\ :running, opts)
  • reclaim_once(state \\ :running, opts)
  • cancel(id, opts \\ [])
  • rewind(id, opts)
  • install_policy(opts \\ [])

Payload rule

Payload bytes are read only when the command asks for payload hydration. claim_payload true makes claim_due/2 request payloads by default, capped by :payload_max_bytes. Large payloads stay omitted and are represented by payload refs/size metadata from core Flow.

Worker

FerricStore.Flow.Worker is optional. You can supervise it, use your own cron, or call claim_due/2 manually. The worker only loops over claim_due/2 and applies ok/3 or error/3 based on handler return values.

Children and fanout

child/2 builds child specs with workflow defaults. fanout/3 and spawn_children/3 call flow_spawn_children/3, carrying parent partition, state, lease token, and fencing token when the parent is a %Job{}.

children = [
  EmailFlow.child(%{id: "email-1", tenant_id: "t1", invoice_id: "i1"}),
  AuditFlow.child(%{id: "audit-1", tenant_id: "t1", invoice_id: "i1"})
]

BillingFlow.fanout(job, children,
  group_id: "notify",
  wait: :all,
  on_all_ok: :notified,
  on_any_error: :notification_failed
)

children/2 and waiting_children/2 query child records through the parent lineage index.

Summary

Functions

child(workflow, attrs, opts)

@spec child(module(), map(), keyword()) :: map() | {:error, binary()}

children(workflow, parent, opts)

@spec children(module(), FerricStore.Flow.Job.t() | map() | binary(), keyword()) ::
  {:ok, [map()]} | {:error, binary()}

claim_due(workflow, state, opts)

@spec claim_due(module(), atom() | binary() | [atom() | binary()], keyword()) ::
  {:ok, [FerricStore.Flow.Job.t()]} | {:error, binary()}

claim_payload(value)

(macro)

claim_payload(value, opts)

(macro)

complete()

(macro)

complete(job, result, opts)

@spec complete(FerricStore.Flow.Job.t(), term(), keyword()) ::
  {:ok, map()} | {:error, binary()}

create(workflow, attrs, opts)

@spec create(module(), map(), keyword()) :: {:ok, map()} | {:error, binary()}

create_many(workflow, items, opts)

@spec create_many(module(), [map()], keyword()) :: {:ok, [map()]} | {:error, binary()}

error(job, reason, opts)

@spec error(FerricStore.Flow.Job.t(), term(), keyword()) ::
  {:ok, map()} | {:error, binary()}

fail()

(macro)

fail(job, reason, opts)

@spec fail(FerricStore.Flow.Job.t(), term(), keyword()) ::
  {:ok, map()} | {:error, binary()}

handle(job, fun, opts)

@spec handle(
  FerricStore.Flow.Job.t(),
  (FerricStore.Flow.Job.t() -> term()),
  keyword()
) :: term()

lease_ms(value)

(macro)

ok(job, value, opts)

@spec ok(FerricStore.Flow.Job.t(), term(), keyword()) ::
  {:ok, map()} | {:error, binary()}

on_error(action)

(macro)

on_ok(action)

(macro)

reclaim_once(workflow, state, opts)

@spec reclaim_once(module(), atom() | binary() | [atom() | binary()], keyword()) ::
  {:ok, [FerricStore.Flow.Job.t()]} | {:error, binary()}

retry(opts)

(macro)

retry(job, reason, opts)

@spec retry(FerricStore.Flow.Job.t(), term(), keyword()) ::
  {:ok, map()} | {:error, binary()}

run_once(workflow, state, opts)

@spec run_once(module(), atom() | binary() | [atom() | binary()], keyword()) ::
  {:ok, [term()]} | {:error, binary()}

spawn_children(workflow, parent, children, opts)

@spec spawn_children(
  module(),
  FerricStore.Flow.Job.t() | map() | binary(),
  [map()],
  keyword()
) ::
  {:ok, map()} | {:error, binary()}

state(name, list)

(macro)

transition(job, to_state, value, opts)

@spec transition(FerricStore.Flow.Job.t(), atom() | binary(), term(), keyword()) ::
  {:ok, map()} | {:error, binary()}

waiting_children(workflow, parent, opts)

@spec waiting_children(
  module(),
  FerricStore.Flow.Job.t() | map() | binary(),
  keyword()
) ::
  {:ok, [map()]} | {:error, binary()}