Signals and Dispatch

View Source

Signal Structure

Signals implement the CloudEvents v1.0.2 specification with Jido extensions:

# Basic signal
# Preferred: positional constructor (type, data, attrs)
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123", email: "user@example.com"},
  source: "/auth/service"
)

# Also available: Map/keyword constructor (backwards compatible)
{:ok, signal} = Jido.Signal.new(%{
  type: "user.created",
  source: "/auth/service",
  data: %{user_id: "123", email: "user@example.com"}
})

# Signal fields
signal.id              # UUID v4 (auto-generated)
signal.specversion     # "1.0.2"
signal.type            # "user.created"
signal.source          # "/auth/service"
signal.data            # %{user_id: "123", email: "user@example.com"}
signal.time            # ISO 8601 timestamp (auto-generated)
signal.datacontenttype # "application/json" (default)
signal.jido_dispatch   # Dispatch configuration (DEPRECATED - pass to Bus.subscribe/3 or Dispatch.dispatch/2 instead)
signal.extensions      # Map of extension namespaces to extension data

# Data semantics (CloudEvents):
# - When datacontenttype is JSON (or omitted in JSON format), data may be any JSON value
#   (object/map, array, string, number, boolean, null)
# - For non-JSON payloads, encode according to datacontenttype; binary payloads use data_base64 when serialized to JSON

Signal IDs

Signals use UUID7 format for their id field, which provides time-ordered identifiers that are efficient for database indexing and naturally sortable:

# IDs are auto-generated by Signal.new
{:ok, signal} = Jido.Signal.new("user.created", %{user_id: "123"}, source: "/auth")
signal.id  # => "0194c3d8-7e82-7d4a-8d5c-1a2b3c4d5e6f"

# Generate IDs manually
id = Jido.Signal.ID.generate!()

# Extract timestamp from ID
timestamp = Jido.Signal.ID.extract_timestamp(signal.id)
# => 1677721600000 (Unix milliseconds)

# Compare IDs chronologically
:lt = Jido.Signal.ID.compare(older_signal.id, newer_signal.id)

ID Utilities

FunctionDescription
Jido.Signal.ID.generate!/0Generate a new UUID7 string
Jido.Signal.ID.generate/0Generate UUID7 with timestamp tuple
Jido.Signal.ID.extract_timestamp/1Get Unix milliseconds from ID
Jido.Signal.ID.compare/2Chronological comparison (:lt, :eq, :gt)
Jido.Signal.ID.sequence_number/1Get sequence number within millisecond
Jido.Signal.ID.valid?/1Validate UUID7 format
Jido.Signal.ID.generate_batch/1Generate multiple ordered IDs

Dispatch Adapters

Dispatch in jido_signal answers one narrow question: where should this signal be delivered? It is a delivery mechanism for signals, not a statement that every effect in the wider Jido ecosystem must be represented as signal dispatch.

jido_signal owns signal envelopes and delivery adapters. The broader boundary between pure agent logic, directives, and runtime execution is documented in Jido's Core Loop and Actions guides. Some actions in the wider ecosystem may still perform direct I/O when they need an immediate result.

PID Adapter

Direct process delivery:

# Async delivery (fire-and-forget)
config = {:pid, [target: pid, delivery_mode: :async]}
:ok = Jido.Signal.Dispatch.dispatch(signal, config)

# Sync delivery (with response)
config = {:pid, [
  target: pid,
  delivery_mode: :sync,
  timeout: 10_000
]}

Named Process Adapter

Delivery to registered processes:

config = {:named, [
  target: {:name, :my_server},
  delivery_mode: :async
]}

PubSub Adapter

Phoenix.PubSub broadcast. This adapter is optional; add {:phoenix_pubsub, "~> 2.1"} to your application dependencies before using :pubsub dispatch.

config = {:pubsub, [
  target: :my_app_pubsub,
  topic: "events"
]}

HTTP Adapter

HTTP endpoint delivery:

config = {:http, [
  url: "https://api.example.com/events",
  method: :post,
  headers: [{"x-api-key", "secret"}],
  timeout: 5000,
  retry: %{max_attempts: 3, base_delay: 1000}
]}

Sync vs Async Patterns

Synchronous Dispatch

Blocks until all deliveries complete:

:ok = Jido.Signal.Dispatch.dispatch(signal, config)

Asynchronous Dispatch

Returns task immediately:

{:ok, task} = Jido.Signal.Dispatch.dispatch_async(signal, config)
:ok = Task.await(task)

Batch Dispatch

Multiple destinations with concurrency control:

configs = List.duplicate({:pid, [target: pid]}, 1000)
:ok = Jido.Signal.Dispatch.dispatch_batch(
  signal, 
  configs, 
  batch_size: 100,
  max_concurrency: 5
)

Multiple Destinations

Send to multiple adapters:

configs = [
  {:pubsub, [target: :pubsub, topic: "events"]},
  {:logger, [level: :info]},
  {:http, [url: "https://webhook.example.com"]}
]
:ok = Jido.Signal.Dispatch.dispatch(signal, configs)

Custom Signal Types

Define structured signal types:

defmodule UserCreatedSignal do
  use Jido.Signal,
    type: "user.created",
    default_source: "/auth/service",
    datacontenttype: "application/json",
    schema: [
      user_id: [type: :string, required: true],
      email: [type: :string, required: true],
      name: [type: :string, required: false]
    ]
end

# Usage
{:ok, signal} = UserCreatedSignal.new(%{
  user_id: "123",
  email: "user@example.com"
})

# Override defaults
{:ok, signal} = UserCreatedSignal.new(
  %{user_id: "123", email: "user@example.com"},
  source: "/different/source"
)

# Then dispatch separately (preferred over jido_dispatch field):
Jido.Signal.Dispatch.dispatch(signal, {:pubsub, [target: :pubsub, topic: "user-events"]})

Typed signals can also define constructor-time extension policy:

defmodule UserCreatedSignal do
  use Jido.Signal,
    type: "user.created",
    schema: [
      user_id: [type: :string, required: true]
    ],
    extension_policy: [
      {MyApp.Signal.Ext.Trace, :required},
      {MyApp.Signal.Ext.Dispatch, :forbidden}
    ]
end

{:ok, signal} =
  UserCreatedSignal.new(%{user_id: "123"},
    trace: %{trace_id: "trace-123", span_id: "span-456"}
  )

Schema Validation

Custom signals validate data against schema:

# Valid
{:ok, signal} = UserCreatedSignal.new(%{
  user_id: "123",
  email: "user@example.com"
})

# Invalid - missing required field
{:error, reason} = UserCreatedSignal.new(%{
  user_id: "123"
  # email is required
})

Error Handling

Validation Errors

# Invalid dispatch config
{:error, reason} = Jido.Signal.Dispatch.validate_opts({:invalid, []})

# Invalid signal data
{:error, reason} = Jido.Signal.new(%{})  # missing type and source

Delivery Errors

By default, dispatch returns raw error atoms. Structured errors (Jido.Signal.Error.DispatchError) are opt-in via configuration.

# Process not alive (returns raw atom by default)
{:error, :process_not_alive} = 
  Jido.Signal.Dispatch.dispatch(signal, {:pid, [target: dead_pid]})

# HTTP timeout
{:error, :timeout} = 
  Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "...", timeout: 1]})

Batch Errors

# Some failures
{:error, errors} = Jido.Signal.Dispatch.dispatch_batch(signal, configs)
# errors = [{index, reason}, ...]

Circuit Breaker

The Jido.Signal.Dispatch.CircuitBreaker module provides fault isolation for dispatch adapters using the :fuse library. Circuits are per-adapter-type, providing bulk protection without per-endpoint overhead.

Configuration

Default settings:

  • 5 failures in 10 seconds triggers the circuit to open
  • 30 second reset time before allowing requests again

Usage

alias Jido.Signal.Dispatch.CircuitBreaker

# Install circuit breaker (once at application startup)
:ok = CircuitBreaker.install(:http, 
  strategy: {:standard, 5, 10_000},  # 5 failures in 10 seconds
  refresh: 30_000                     # 30 second reset
)

# Wrap dispatch calls with circuit breaker protection
case CircuitBreaker.run(:http, fn ->
       Jido.Signal.Dispatch.dispatch(signal, {:http, [url: "https://api.example.com/events"]})
     end) do
  :ok -> 
    :ok
  {:error, :circuit_open} -> 
    # Circuit is open, degrade gracefully
    Logger.warning("HTTP circuit open, queuing for retry")
    {:error, :circuit_open}
  {:error, reason} -> 
    {:error, reason}
end

# Check circuit status
:ok = CircuitBreaker.status(:http)    # Circuit closed (healthy)
:blown = CircuitBreaker.status(:http) # Circuit open (failing)

# Manually reset circuit
:ok = CircuitBreaker.reset(:http)

Telemetry Events

The circuit breaker emits telemetry events:

  • [:jido, :dispatch, :circuit, :melt] - Failure recorded
  • [:jido, :dispatch, :circuit, :rejected] - Request rejected (circuit open)
  • [:jido, :dispatch, :circuit, :reset] - Circuit reset

Next Steps

  • Event Bus - Publish/subscribe messaging with middleware hooks and persistent subscriptions
  • Signal Router - High-performance trie-based routing with pattern matching