Signals and Dispatch
View SourceSignal Structure
Signals implement the CloudEvents v1.0.2 specification with Jido extensions:
# Basic signal
# Preferred: positional constructor (type, data, attrs)
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123", email: "user@example.com"},
source: "/auth/service"
)
# Also available: Map/keyword constructor (backwards compatible)
{:ok, signal} = Jido.Signal.new(%{
type: "user.created",
source: "/auth/service",
data: %{user_id: "123", email: "user@example.com"}
})
# Signal fields
signal.id # UUID v4 (auto-generated)
signal.specversion # "1.0.2"
signal.type # "user.created"
signal.source # "/auth/service"
signal.data # %{user_id: "123", email: "user@example.com"}
signal.time # ISO 8601 timestamp (auto-generated)
signal.datacontenttype # "application/json" (default)
signal.jido_dispatch # Dispatch configuration (DEPRECATED - pass to Bus.subscribe/3 or Dispatch.dispatch/2 instead)
signal.extensions # Map of extension namespaces to extension data
# Data semantics (CloudEvents):
# - When datacontenttype is JSON (or omitted in JSON format), data may be any JSON value
# (object/map, array, string, number, boolean, null)
# - For non-JSON payloads, encode according to datacontenttype; binary payloads use data_base64 when serialized to JSONSignal IDs
Signals use UUID7 format for their id field, which provides time-ordered identifiers that are efficient for database indexing and naturally sortable:
# IDs are auto-generated by Signal.new
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123"}, source: "/auth")
signal.id # => "0194c3d8-7e82-7d4a-8d5c-1a2b3c4d5e6f"
# Generate IDs manually
id = Jido.Signal.ID.generate!()
# Extract timestamp from ID
timestamp = Jido.Signal.ID.extract_timestamp(signal.id)
# => 1677721600000 (Unix milliseconds)
# Compare IDs chronologically
:lt = Jido.Signal.ID.compare(older_signal.id, newer_signal.id)ID Utilities
| Function | Description |
|---|---|
Jido.Signal.ID.generate!/0 | Generate a new UUID7 string |
Jido.Signal.ID.generate/0 | Generate UUID7 with timestamp tuple |
Jido.Signal.ID.extract_timestamp/1 | Get Unix milliseconds from ID |
Jido.Signal.ID.compare/2 | Chronological comparison (:lt, :eq, :gt) |
Jido.Signal.ID.sequence_number/1 | Get sequence number within millisecond |
Jido.Signal.ID.valid?/1 | Validate UUID7 format |
Jido.Signal.ID.generate_batch/1 | Generate multiple ordered IDs |
Dispatch Adapters
Dispatch in jido_signal answers one narrow question: where should this signal be delivered?
It is a delivery mechanism for signals, not a statement that every effect in the wider Jido
ecosystem must be represented as signal dispatch.
jido_signal owns signal envelopes and delivery adapters. The broader boundary between pure
agent logic, directives, and runtime execution is documented in Jido's
Core Loop and
Actions guides. Some actions in the wider ecosystem may
still perform direct I/O when they need an immediate result.
PID Adapter
Direct process delivery:
# Async delivery (fire-and-forget)
config = {:pid, [target: pid, delivery_mode: :async]}
:ok = Jido.Signal.Dispatch.dispatch(signal, config)
# Sync delivery (with response)
config = {:pid, [
target: pid,
delivery_mode: :sync,
timeout: 10_000
]}Named Process Adapter
Delivery to registered processes:
config = {:named, [
target: {:name, :my_server},
delivery_mode: :async
]}PubSub Adapter
Phoenix.PubSub broadcast:
config = {:pubsub, [
target: :my_app_pubsub,
topic: "events"
]}HTTP Adapter
HTTP endpoint delivery:
config = {:http, [
url: "https://api.example.com/events",
method: :post,
headers: [{"x-api-key", "secret"}],
timeout: 5000,
retry: %{max_attempts: 3, base_delay: 1000}
]}Sync vs Async Patterns
Synchronous Dispatch
Blocks until all deliveries complete:
:ok = Jido.Signal.Dispatch.dispatch(signal, config)Asynchronous Dispatch
Returns task immediately:
{:ok, task} = Jido.Signal.Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)Batch Dispatch
Multiple destinations with concurrency control:
configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Jido.Signal.Dispatch.dispatch_batch(
signal,
configs,
batch_size: 100,
max_concurrency: 5
)Multiple Destinations
Send to multiple adapters:
configs = [
{:pubsub, [target: :pubsub, topic: "events"]},
{:logger, [level: :info]},
{:http, [url: "https://webhook.example.com"]}
]
:ok = Jido.Signal.Dispatch.dispatch(signal, configs)Custom Signal Types
Define structured signal types:
defmodule UserCreatedSignal do
use Jido.Signal,
type: "user.created",
default_source: "/auth/service",
datacontenttype: "application/json",
schema: [
user_id: [type: :string, required: true],
email: [type: :string, required: true],
name: [type: :string, required: false]
]
end
# Usage
{:ok, signal} = UserCreatedSignal.new(%{
user_id: "123",
email: "user@example.com"
})
# Override defaults
{:ok, signal} = UserCreatedSignal.new(
%{user_id: "123", email: "user@example.com"},
source: "/different/source"
)
# Then dispatch separately (preferred over jido_dispatch field):
Jido.Signal.Dispatch.dispatch(signal, {:pubsub, [target: :pubsub, topic: "user-events"]})Typed signals can also define constructor-time extension policy:
defmodule UserCreatedSignal do
use Jido.Signal,
type: "user.created",
schema: [
user_id: [type: :string, required: true]
],
extension_policy: [
{MyApp.Signal.Ext.Trace, :required},
{MyApp.Signal.Ext.Dispatch, :forbidden}
]
end
{:ok, signal} =
UserCreatedSignal.new(%{user_id: "123"},
trace: %{trace_id: "trace-123", span_id: "span-456"}
)Schema Validation
Custom signals validate data against schema:
# Valid
{:ok, signal} = UserCreatedSignal.new(%{
user_id: "123",
email: "user@example.com"
})
# Invalid - missing required field
{:error, reason} = UserCreatedSignal.new(%{
user_id: "123"
# email is required
})Error Handling
Validation Errors
# Invalid dispatch config
{:error, reason} = Jido.Signal.Dispatch.validate_opts({:invalid, []})
# Invalid signal data
{:error, reason} = Jido.Signal.new(%{}) # missing type and sourceDelivery Errors
By default, dispatch returns raw error atoms. Structured errors (Jido.Signal.Error.DispatchError) are opt-in via configuration.
# Process not alive (returns raw atom by default)
{:error, :process_not_alive} =
Jido.Signal.Dispatch.dispatch(signal, {:pid, [target: dead_pid]})
# HTTP timeout
{:error, :timeout} =
Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "...", timeout: 1]})Batch Errors
# Some failures
{:error, errors} = Jido.Signal.Dispatch.dispatch_batch(signal, configs)
# errors = [{index, reason}, ...]Circuit Breaker
The Jido.Signal.Dispatch.CircuitBreaker module provides fault isolation for dispatch adapters using the :fuse library. Circuits are per-adapter-type, providing bulk protection without per-endpoint overhead.
Configuration
Default settings:
- 5 failures in 10 seconds triggers the circuit to open
- 30 second reset time before allowing requests again
Usage
alias Jido.Signal.Dispatch.CircuitBreaker
# Install circuit breaker (once at application startup)
:ok = CircuitBreaker.install(:http,
strategy: {:standard, 5, 10_000}, # 5 failures in 10 seconds
refresh: 30_000 # 30 second reset
)
# Wrap dispatch calls with circuit breaker protection
case CircuitBreaker.run(:http, fn ->
Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "https://api.example.com/events"]})
end) do
:ok ->
:ok
{:error, :circuit_open} ->
# Circuit is open, degrade gracefully
Logger.warning("HTTP circuit open, queuing for retry")
{:error, :circuit_open}
{:error, reason} ->
{:error, reason}
end
# Check circuit status
:ok = CircuitBreaker.status(:http) # Circuit closed (healthy)
:blown = CircuitBreaker.status(:http) # Circuit open (failing)
# Manually reset circuit
:ok = CircuitBreaker.reset(:http)Telemetry Events
The circuit breaker emits telemetry events:
[:jido, :dispatch, :circuit, :melt]- Failure recorded[:jido, :dispatch, :circuit, :rejected]- Request rejected (circuit open)[:jido, :dispatch, :circuit, :reset]- Circuit reset
Next Steps
- Event Bus - Publish/subscribe messaging with middleware hooks and persistent subscriptions
- Signal Router - High-performance trie-based routing with pattern matching