Application-level resource manager for per-task-queue Temporal-bridge
worker handles. A single long-lived GenServer started under
Hourglass.Application that owns every bridge handle in the VM and
mediates every NIF call against them.
Why "Application-level"
Previously, this module lived under Hourglass.Worker.BridgeHolder
inside each per-task-queue Worker subtree, publishing a raw
reference() to a shared ETS table. Poll loops, evaluators, and
activity executors read the handle directly and called the NIF. That
per-Worker placement (the deleted Worker.BridgeHolder module) had
three problems dissolved by promoting it to an Application-level
singleton:
A poll-loop crash under
:rest_for_onecascade-restarted the BridgeHolder, but in-flight evaluator/executor Tasks (spawned under sibling DynSups) survived holding stale handles — producing dirty NIF leaks once they tried to ship completions on the destroyed handle.Bridge.worker_shutdown/1had to be sequenced via a separateShutdownSentinelchild, because Supervisor reverse-order teardown didn't naturally line up with the bridge handle being drained before its consumers were killed.The cross-Worker test suite documented an "in-flight kill / cascade restart" gap (
cross_worker_affinity_test.exs) that was a real partial-recovery limitation, not a flaw in the pure-function evaluator model.
Pulling handle ownership out of the Worker tree dissolves all three.
Worker child crashes can no longer destroy the handle. Poll loops,
evaluators, and executors call this module's API instead of holding
raw refs. Graceful shutdown of a Worker is ordered by
Worker.terminate/2 calling unregister_worker/1 BEFORE its poll
loops are killed (so in-flight long-polls return :shutdown
cleanly).
Async-reply dispatch via Task.Supervisor (polls + register + unregister)
Bridge.worker_poll_* is a dirty-NIF call that may block for many
seconds. Bridge.worker_new/2 and Bridge.worker_shutdown/1 aren't
long-polls but each connect to the Temporal cluster and routinely
take 50–500ms. Running any of these inline in handle_call would
serialise every other call (poll, complete, register, unregister)
across every task queue in the VM behind it — and with async tests
starting Workers concurrently, that turns parallel work into serial
work and pushes a clean mix test test/hourglass/temporal/ from
~3 seconds to 30+ seconds.
The fix: a Task.Supervisor (started under this GenServer) spawns a
child Task per slow request. The Task does the NIF call and replies
to the original caller via GenServer.reply(from, result). The
GenServer mailbox returns immediately. Multiple concurrent calls run
in parallel (each in its own Task). Caller blocks on GenServer.call
until the Task replies, so the synchronous contract is preserved.
Three call shapes use this pattern:
poll_workflow_activation/1+poll_activity_task/1— long-poll (seconds).register_worker/2—worker_new(50–500ms). The Task sends the result back via an internal:store_handlemessage; apending_registrationsMapSet guards against duplicate-register races during the in-flight window.unregister_worker/1—worker_shutdown(50–500ms). The handle is popped from state synchronously (so subsequentregistered?/1calls see the unregister immediately); theworker_shutdownruns in the Task and replies once it returns.Worker.terminate/2relies on this contract: when theunregister_workerreply lands, in-flight poll NIFs on the handle have already begun returning:shutdown, so the inner Supervisor stop that follows can reap the poll loops cleanly.
Serialised workflow-activation completions
Bridge.worker_complete_workflow_activation/2 is a blocking NIF that
SDK-Core's complete_workflow_activation future may take tens of
seconds to resolve when the workflow has pending activity tasks (it
waits for them to arrive via complete_activity_task before closing
the workflow). Running it inline in handle_call deadlocks: the
GenServer mailbox is blocked, so complete_activity_task calls can
never be processed, and Core waits forever.
The fix: run each complete_workflow_activation NIF in a Task (like
the long-poll calls). The Task replies to the caller via
GenServer.reply/2 when the NIF returns, so the synchronous contract
is preserved. The GenServer mailbox is free while the NIF runs,
allowing complete_activity_task calls to proceed.
However, SDK-Core's worker requires sequential (not concurrent)
complete_workflow_activation calls: concurrent calls for the same
worker handle can race inside Tokio and leave the workflow in an
inconsistent state that Temporal Server never observes as closed.
The solution: a per-task-queue FIFO queue stored in
completion_queue. When a complete_workflow_activation call arrives:
- If no Task is active for that task_queue: start one immediately,
mark the queue as
:activeincompletion_active. - If a Task is active: enqueue
{from, bytes}incompletion_queue.
When a Task finishes it sends {:workflow_activation_flushed, task_queue, from, result}
back to the GenServer, which dequeues the next waiting call (if any)
and starts a new Task for it.
Bridge.worker_complete_activity_task/2 is kept inline because it is
fast (no long wait in Core), and keeping it inline in handle_call
ensures it can always be processed even while a
complete_workflow_activation Task is running.
In-flight Task survival
Evaluator and activity-executor Tasks live under shared
Application-level DynSups, NOT under per-Worker subtrees. They
survive Worker child crashes. When they try to ship a completion
after a transient registry hiccup, the call returns
{:error, :worker_not_registered} — the Task exits — Core
redelivers the activation/task on the next poll on a fresh consumer.
Idempotency for activities that have already done expensive work
is the caller's responsibility — Hourglass provides no built-in
deduplication cache.
Poll-caller DOWN ⇒ handle recycle
Dirty NIFs cannot be preempted from BEAM, and temporalio-sdk-core
exposes no per-poll cancellation API — Worker::shutdown() is the
only mechanism that makes in-flight polls return, and it cancels
ALL polls on the handle.
When a poll caller (a WorkflowPollLoop or ActivityPollLoop)
is killed mid-poll, the orphaned Task in task_sup stays parked on
its dirty NIF call. Once Core delivers an activation/task to it,
GenServer.reply/2 to the now-dead caller pid is silently dropped;
Core sees the activation as delivered and does not redeliver within
any reasonable test budget (observed: workflow stays parked at
workflow-task-delivery for at least 5 minutes).
The fix: BridgeHolder monitors every poll caller. If the caller
process dies before the in-flight Task replies, the holder recycles
the bridge handle for that task queue — worker_shutdown (drains
every in-flight poll on the old handle, including the orphan) +
worker_new (allocates a fresh handle from the original
register_opts). Healthy parallel pollers see
{:error, %Bridge.Error{kind: :shutdown}} and treat it as the
normal transient-shutdown retry condition. The originally-affected
workflow's activation gets redelivered by Core to the next poll on
the new handle.
Race ordering is one-sided in the holder's favor: the poll Task
sends {:poll_replied, caller_pid} BEFORE its GenServer.reply,
so on a clean reply the cleanup Process.demonitor(ref, [:flush])
always runs before any :DOWN could be observed (the caller cannot
have died from observing the reply yet, because the reply hasn't
arrived). On a real DOWN-while-parked the entry is still in the
pollers map when the holder processes the :DOWN, triggering the
recycle.
Recycle is async (worker_shutdown + worker_new together are
100s of ms) — the DOWN handler returns {:noreply, state}
immediately after dispatching a recycle Task under task_sup.
The recycle Task sends {:handle_recycled, task_queue, result}
back. If task_queue is no longer in state.handles when the
result lands (an unregister_worker call won the race), the new
handle is shut down on the spot and discarded.
Summary
Functions
Returns a specification to start this module under a supervisor.
Ship the activity-task completion bytes back to Core. Same shape as
complete_workflow_activation/2.
Ship the workflow-activation completion bytes back to Core. Synchronous; the underlying NIF call is in-process to Core (no network round-trip).
Long-poll for the next activity task on task_queue. Same shape as
poll_workflow_activation/1.
Long-poll for the next workflow activation on task_queue. Blocks
the caller until Core delivers an activation or the bridge worker
shuts down. The blocking work runs in a child Task under this
module's Task.Supervisor so the GenServer mailbox stays responsive.
Allocate a bridge worker for task_queue. Synchronous: returns
:ok once the underlying Bridge.worker_new/2 succeeds and the
handle is recorded in state. Returns {:error, reason} on NIF
failure or if task_queue is already registered.
Returns true iff a handle is currently registered for task_queue.
Used by poll loops to distinguish "registration in flight" (transient
retry) from "no worker should be polling" (exit).
Drop the handle for task_queue (calling Bridge.worker_shutdown/1
on the way out). Idempotent: unknown task_queue returns :ok.
Types
@type register_opts() :: [ namespace: String.t(), max_cached_workflows: pos_integer(), target_url: String.t(), max_outstanding_workflow_tasks: non_neg_integer(), max_outstanding_activities: non_neg_integer(), max_outstanding_local_activities: non_neg_integer() ]
Options accepted by register_worker/2.
@type state() :: %{ handles: %{optional(String.t()) => reference()}, pending_registrations: MapSet.t(String.t()), register_opts: %{optional(String.t()) => register_opts()}, pollers: %{optional(pid()) => {String.t(), reference()}}, recycling: MapSet.t(String.t()), task_sup: pid(), completion_active: MapSet.t(String.t()), completion_queue: %{optional(String.t()) => :queue.queue({term(), binary()})} }
Internal state.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Ship the activity-task completion bytes back to Core. Same shape as
complete_workflow_activation/2.
Ship the workflow-activation completion bytes back to Core. Synchronous; the underlying NIF call is in-process to Core (no network round-trip).
Long-poll for the next activity task on task_queue. Same shape as
poll_workflow_activation/1.
Long-poll for the next workflow activation on task_queue. Blocks
the caller until Core delivers an activation or the bridge worker
shuts down. The blocking work runs in a child Task under this
module's Task.Supervisor so the GenServer mailbox stays responsive.
@spec register_worker(String.t(), register_opts()) :: :ok | {:error, term()}
Allocate a bridge worker for task_queue. Synchronous: returns
:ok once the underlying Bridge.worker_new/2 succeeds and the
handle is recorded in state. Returns {:error, reason} on NIF
failure or if task_queue is already registered.
Returns true iff a handle is currently registered for task_queue.
Used by poll loops to distinguish "registration in flight" (transient
retry) from "no worker should be polling" (exit).
@spec start_link(keyword()) :: GenServer.on_start()
@spec unregister_worker(String.t()) :: :ok
Drop the handle for task_queue (calling Bridge.worker_shutdown/1
on the way out). Idempotent: unknown task_queue returns :ok.
After this call any in-flight Bridge.worker_poll_* against the
shutdown handle will return {:error, %Bridge.Error{kind: :shutdown}},
which is the contract the poll loops use to exit :normal.