Hourglass.BridgeHolder (hourglass v0.1.0)

Copy Markdown View Source

Application-level resource manager for per-task-queue Temporal-bridge worker handles. A single long-lived GenServer started under Hourglass.Application that owns every bridge handle in the VM and mediates every NIF call against them.

Why "Application-level"

Previously, this module lived under Hourglass.Worker.BridgeHolder inside each per-task-queue Worker subtree, publishing a raw reference() to a shared ETS table. Poll loops, evaluators, and activity executors read the handle directly and called the NIF. That per-Worker placement (the deleted Worker.BridgeHolder module) had three problems dissolved by promoting it to an Application-level singleton:

  1. A poll-loop crash under :rest_for_one cascade-restarted the BridgeHolder, but in-flight evaluator/executor Tasks (spawned under sibling DynSups) survived holding stale handles — producing dirty NIF leaks once they tried to ship completions on the destroyed handle.

  2. Bridge.worker_shutdown/1 had to be sequenced via a separate ShutdownSentinel child, because Supervisor reverse-order teardown didn't naturally line up with the bridge handle being drained before its consumers were killed.

  3. The cross-Worker test suite documented an "in-flight kill / cascade restart" gap (cross_worker_affinity_test.exs) that was a real partial-recovery limitation, not a flaw in the pure-function evaluator model.

Pulling handle ownership out of the Worker tree dissolves all three. Worker child crashes can no longer destroy the handle. Poll loops, evaluators, and executors call this module's API instead of holding raw refs. Graceful shutdown of a Worker is ordered by Worker.terminate/2 calling unregister_worker/1 BEFORE its poll loops are killed (so in-flight long-polls return :shutdown cleanly).

Async-reply dispatch via Task.Supervisor (polls + register + unregister)

Bridge.worker_poll_* is a dirty-NIF call that may block for many seconds. Bridge.worker_new/2 and Bridge.worker_shutdown/1 aren't long-polls but each connect to the Temporal cluster and routinely take 50–500ms. Running any of these inline in handle_call would serialise every other call (poll, complete, register, unregister) across every task queue in the VM behind it — and with async tests starting Workers concurrently, that turns parallel work into serial work and pushes a clean mix test test/hourglass/temporal/ from ~3 seconds to 30+ seconds.

The fix: a Task.Supervisor (started under this GenServer) spawns a child Task per slow request. The Task does the NIF call and replies to the original caller via GenServer.reply(from, result). The GenServer mailbox returns immediately. Multiple concurrent calls run in parallel (each in its own Task). Caller blocks on GenServer.call until the Task replies, so the synchronous contract is preserved.

Three call shapes use this pattern:

  • poll_workflow_activation/1 + poll_activity_task/1 — long-poll (seconds).
  • register_worker/2worker_new (50–500ms). The Task sends the result back via an internal :store_handle message; a pending_registrations MapSet guards against duplicate-register races during the in-flight window.
  • unregister_worker/1worker_shutdown (50–500ms). The handle is popped from state synchronously (so subsequent registered?/1 calls see the unregister immediately); the worker_shutdown runs in the Task and replies once it returns. Worker.terminate/2 relies on this contract: when the unregister_worker reply lands, in-flight poll NIFs on the handle have already begun returning :shutdown, so the inner Supervisor stop that follows can reap the poll loops cleanly.

Serialised workflow-activation completions

Bridge.worker_complete_workflow_activation/2 is a blocking NIF that SDK-Core's complete_workflow_activation future may take tens of seconds to resolve when the workflow has pending activity tasks (it waits for them to arrive via complete_activity_task before closing the workflow). Running it inline in handle_call deadlocks: the GenServer mailbox is blocked, so complete_activity_task calls can never be processed, and Core waits forever.

The fix: run each complete_workflow_activation NIF in a Task (like the long-poll calls). The Task replies to the caller via GenServer.reply/2 when the NIF returns, so the synchronous contract is preserved. The GenServer mailbox is free while the NIF runs, allowing complete_activity_task calls to proceed.

However, SDK-Core's worker requires sequential (not concurrent) complete_workflow_activation calls: concurrent calls for the same worker handle can race inside Tokio and leave the workflow in an inconsistent state that Temporal Server never observes as closed.

The solution: a per-task-queue FIFO queue stored in completion_queue. When a complete_workflow_activation call arrives:

  • If no Task is active for that task_queue: start one immediately, mark the queue as :active in completion_active.
  • If a Task is active: enqueue {from, bytes} in completion_queue.

When a Task finishes it sends {:workflow_activation_flushed, task_queue, from, result} back to the GenServer, which dequeues the next waiting call (if any) and starts a new Task for it.

Bridge.worker_complete_activity_task/2 is kept inline because it is fast (no long wait in Core), and keeping it inline in handle_call ensures it can always be processed even while a complete_workflow_activation Task is running.

In-flight Task survival

Evaluator and activity-executor Tasks live under shared Application-level DynSups, NOT under per-Worker subtrees. They survive Worker child crashes. When they try to ship a completion after a transient registry hiccup, the call returns {:error, :worker_not_registered} — the Task exits — Core redelivers the activation/task on the next poll on a fresh consumer. Idempotency for activities that have already done expensive work is the caller's responsibility — Hourglass provides no built-in deduplication cache.

Poll-caller DOWN ⇒ handle recycle

Dirty NIFs cannot be preempted from BEAM, and temporalio-sdk-core exposes no per-poll cancellation API — Worker::shutdown() is the only mechanism that makes in-flight polls return, and it cancels ALL polls on the handle.

When a poll caller (a WorkflowPollLoop or ActivityPollLoop) is killed mid-poll, the orphaned Task in task_sup stays parked on its dirty NIF call. Once Core delivers an activation/task to it, GenServer.reply/2 to the now-dead caller pid is silently dropped; Core sees the activation as delivered and does not redeliver within any reasonable test budget (observed: workflow stays parked at workflow-task-delivery for at least 5 minutes).

The fix: BridgeHolder monitors every poll caller. If the caller process dies before the in-flight Task replies, the holder recycles the bridge handle for that task queue — worker_shutdown (drains every in-flight poll on the old handle, including the orphan) + worker_new (allocates a fresh handle from the original register_opts). Healthy parallel pollers see {:error, %Bridge.Error{kind: :shutdown}} and treat it as the normal transient-shutdown retry condition. The originally-affected workflow's activation gets redelivered by Core to the next poll on the new handle.

Race ordering is one-sided in the holder's favor: the poll Task sends {:poll_replied, caller_pid} BEFORE its GenServer.reply, so on a clean reply the cleanup Process.demonitor(ref, [:flush]) always runs before any :DOWN could be observed (the caller cannot have died from observing the reply yet, because the reply hasn't arrived). On a real DOWN-while-parked the entry is still in the pollers map when the holder processes the :DOWN, triggering the recycle.

Recycle is async (worker_shutdown + worker_new together are 100s of ms) — the DOWN handler returns {:noreply, state} immediately after dispatching a recycle Task under task_sup. The recycle Task sends {:handle_recycled, task_queue, result} back. If task_queue is no longer in state.handles when the result lands (an unregister_worker call won the race), the new handle is shut down on the spot and discarded.

Summary

Types

Options accepted by register_worker/2.

Internal state.

Functions

Returns a specification to start this module under a supervisor.

Ship the activity-task completion bytes back to Core. Same shape as complete_workflow_activation/2.

Ship the workflow-activation completion bytes back to Core. Synchronous; the underlying NIF call is in-process to Core (no network round-trip).

Long-poll for the next activity task on task_queue. Same shape as poll_workflow_activation/1.

Long-poll for the next workflow activation on task_queue. Blocks the caller until Core delivers an activation or the bridge worker shuts down. The blocking work runs in a child Task under this module's Task.Supervisor so the GenServer mailbox stays responsive.

Allocate a bridge worker for task_queue. Synchronous: returns :ok once the underlying Bridge.worker_new/2 succeeds and the handle is recorded in state. Returns {:error, reason} on NIF failure or if task_queue is already registered.

Returns true iff a handle is currently registered for task_queue. Used by poll loops to distinguish "registration in flight" (transient retry) from "no worker should be polling" (exit).

Drop the handle for task_queue (calling Bridge.worker_shutdown/1 on the way out). Idempotent: unknown task_queue returns :ok.

Types

register_opts()

@type register_opts() :: [
  namespace: String.t(),
  max_cached_workflows: pos_integer(),
  target_url: String.t(),
  max_outstanding_workflow_tasks: non_neg_integer(),
  max_outstanding_activities: non_neg_integer(),
  max_outstanding_local_activities: non_neg_integer()
]

Options accepted by register_worker/2.

state()

@type state() :: %{
  handles: %{optional(String.t()) => reference()},
  pending_registrations: MapSet.t(String.t()),
  register_opts: %{optional(String.t()) => register_opts()},
  pollers: %{optional(pid()) => {String.t(), reference()}},
  recycling: MapSet.t(String.t()),
  task_sup: pid(),
  completion_active: MapSet.t(String.t()),
  completion_queue: %{optional(String.t()) => :queue.queue({term(), binary()})}
}

Internal state.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

complete_activity_task(task_queue, bytes)

@spec complete_activity_task(String.t(), binary()) :: :ok | {:error, term()}

Ship the activity-task completion bytes back to Core. Same shape as complete_workflow_activation/2.

complete_workflow_activation(task_queue, bytes)

@spec complete_workflow_activation(String.t(), binary()) :: :ok | {:error, term()}

Ship the workflow-activation completion bytes back to Core. Synchronous; the underlying NIF call is in-process to Core (no network round-trip).

poll_activity_task(task_queue)

@spec poll_activity_task(String.t()) :: {:ok, binary()} | {:error, term()}

Long-poll for the next activity task on task_queue. Same shape as poll_workflow_activation/1.

poll_workflow_activation(task_queue)

@spec poll_workflow_activation(String.t()) :: {:ok, binary()} | {:error, term()}

Long-poll for the next workflow activation on task_queue. Blocks the caller until Core delivers an activation or the bridge worker shuts down. The blocking work runs in a child Task under this module's Task.Supervisor so the GenServer mailbox stays responsive.

register_worker(task_queue, opts \\ [])

@spec register_worker(String.t(), register_opts()) :: :ok | {:error, term()}

Allocate a bridge worker for task_queue. Synchronous: returns :ok once the underlying Bridge.worker_new/2 succeeds and the handle is recorded in state. Returns {:error, reason} on NIF failure or if task_queue is already registered.

registered?(task_queue)

@spec registered?(String.t()) :: boolean()

Returns true iff a handle is currently registered for task_queue. Used by poll loops to distinguish "registration in flight" (transient retry) from "no worker should be polling" (exit).

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

unregister_worker(task_queue)

@spec unregister_worker(String.t()) :: :ok

Drop the handle for task_queue (calling Bridge.worker_shutdown/1 on the way out). Idempotent: unknown task_queue returns :ok.

After this call any in-flight Bridge.worker_poll_* against the shutdown handle will return {:error, %Bridge.Error{kind: :shutdown}}, which is the contract the poll loops use to exit :normal.