Security Guide

View Source

Prerequisites: Configuration Guide

Secure your Jido Action applications with proper input validation, resource limits, access controls, and protection against common vulnerabilities.

Input Validation & Sanitization

Schema-Based Validation

Actions automatically validate inputs using NimbleOptions schemas:

defmodule MyApp.Actions.SecureUser do
  use Jido.Action,
    name: "secure_user",
    schema: [
      # Type validation prevents injection attacks
      user_id: [
        type: :string,
        required: true,
        # Regex validation for format
        matches: ~r/^[a-zA-Z0-9_-]{1,50}$/
      ],
      email: [
        type: :string,
        required: true,
        # Length limits prevent DoS
        max_length: 320
      ],
      age: [
        type: :integer,
        required: true,
        # Range validation
        min: 13,
        max: 150
      ],
      # Whitelist allowed values
      role: [
        type: :atom,
        in: [:user, :admin, :moderator],
        default: :user
      ]
    ]

  def run(params, _context) do
    # Additional business logic validation
    with :ok <- validate_email_domain(params.email),
         :ok <- check_user_permissions(params.role) do
      {:ok, process_user(params)}
    end
  end

  defp validate_email_domain(email) do
    domain = email |> String.split("@") |> List.last()
    
    if domain in allowed_domains() do
      :ok
    else
      {:error, Jido.Action.Error.validation_error(
        "Email domain not allowed",
        %{domain: domain}
      )}
    end
  end
end

Input Sanitization

defmodule MyApp.Actions.SanitizeInput do
  use Jido.Action,
    schema: [
      content: [type: :string, required: true],
      format: [type: :atom, in: [:html, :markdown, :text], default: :text]
    ]

  @impl true
  def on_before_validate_params(params) do
    # Sanitize before validation
    sanitized = Map.update!(params, :content, &sanitize_content/1)
    {:ok, sanitized}
  end

  def run(params, _context) do
    # Content is already sanitized
    {:ok, %{content: params.content, length: String.length(params.content)}}
  end

  defp sanitize_content(content) do
    content
    |> String.trim()
    |> remove_dangerous_characters()
    |> limit_length(10_000)
  end

  defp remove_dangerous_characters(content) do
    # Remove or escape dangerous characters
    content
    |> String.replace(~r/[<>\"'&]/, "")
    |> String.replace(~r/javascript:/i, "")
    |> String.replace(~r/data:/i, "")
  end

  defp limit_length(content, max_length) do
    if String.length(content) > max_length do
      String.slice(content, 0, max_length)
    else
      content
    end
  end
end

Resource Limits

Memory Limits

defmodule MyApp.Actions.LimitedMemory do
  use Jido.Action,
    schema: [data: [type: :string, required: true]]

  def run(params, _context) do
    # Check input size before processing
    if byte_size(params.data) > max_input_size() do
      {:error, Jido.Action.Error.validation_error(
        "Input too large",
        %{size: byte_size(params.data), max: max_input_size()}
      )}
    else
      process_data_safely(params.data)
    end
  end

  defp max_input_size, do: 1_000_000  # 1MB

  defp process_data_safely(data) do
    # Monitor memory usage during processing
    start_memory = :erlang.memory(:processes)
    
    result = process_data(data)
    
    end_memory = :erlang.memory(:processes)
    memory_used = end_memory - start_memory
    
    if memory_used > max_memory_per_action() do
      Logger.warning("High memory usage detected",
        action: __MODULE__,
        memory_used: memory_used
      )
    end
    
    {:ok, result}
  end

  defp max_memory_per_action, do: 10_000_000  # 10MB
end

Time Limits

The execution engine provides timeout protection via Jido.Exec.run/4:

# Use execution engine timeouts
{:ok, result} = Jido.Exec.run(
  MyApp.Actions.TimeLimited,
  params,
  context,
  timeout: 5_000  # Prevent long-running operations (default: 30000ms)
)

# For async operations
async_ref = Jido.Exec.run_async(MyAction, params, context)
{:ok, result} = Jido.Exec.await(async_ref, 10_000)  # Custom timeout

# Internal timeout handling in actions
defmodule MyApp.Actions.TimeLimited do
  use Jido.Action,
    schema: [operation: [type: :atom, required: true]]

  def run(params, _context) do
    # Use Task for internal timeout control
    task = Task.async(fn -> expensive_operation(params.operation) end)
    
    case Task.yield(task, 2_000) || Task.shutdown(task) do
      {:ok, result} -> {:ok, result}
      nil -> {:error, Jido.Action.Error.timeout_error("Operation timed out")}
    end
  end
end

Configure default timeout globally:

# config/config.exs
config :jido_action, :default_timeout, 30_000

File System Restrictions

If you use the built-in Jido.Tools.Files.* actions, scope them before exposing them to agent-selected or user-influenced calls:

# config/runtime.exs
config :jido_action, file_tool_roots: ["/srv/my_app/data"]

# Per request, optionally narrower than the configured roots
context = %{allowed_file_roots: ["/srv/my_app/data/uploads"]}
Jido.Tools.Files.ReadFile.run(%{path: "profile.json"}, context)

When roots are present, relative paths resolve under the first allowed root. The file tools reject paths and symlinks that resolve outside the allowed roots, reject parent traversal in glob patterns, and refuse protected deletion targets such as /, the current working directory, and the current user's home directory.

defmodule MyApp.Actions.SecureFileOp do
  use Jido.Action,
    schema: [
      path: [type: :string, required: true],
      content: [type: :string, required: true]
    ]

  def run(params, _context) do
    with :ok <- validate_file_path(params.path),
         :ok <- validate_file_size(params.content),
         {:ok, _} <- write_file_safely(params.path, params.content) do
      {:ok, %{file: params.path, size: byte_size(params.content)}}
    end
  end

  defp validate_file_path(path) do
    allowed_dirs = ["/tmp/uploads", "/var/data/safe"]
    
    # Convert to absolute path and normalize
    abs_path = Path.expand(path)
    
    # Check if path is within allowed directories
    if Enum.any?(allowed_dirs, &String.starts_with?(abs_path, &1)) do
      :ok
    else
      {:error, Jido.Action.Error.validation_error(
        "File path not allowed",
        %{path: path, allowed_dirs: allowed_dirs}
      )}
    end
  end

  defp validate_file_size(content) do
    max_size = 1_000_000  # 1MB
    
    if byte_size(content) > max_size do
      {:error, Jido.Action.Error.validation_error(
        "File too large",
        %{size: byte_size(content), max: max_size}
      )}
    else
      :ok
    end
  end

  defp write_file_safely(path, content) do
    # Ensure directory exists (but only in allowed paths)
    case File.mkdir_p(Path.dirname(path)) do
      :ok -> File.write(path, content)
      error -> error
    end
  end
end

Network Security

HTTP Request Validation

defmodule MyApp.Actions.SecureHttpCall do
  use Jido.Action,
    schema: [
      url: [type: :string, required: true],
      method: [type: :atom, in: [:get, :post, :put, :delete], default: :get],
      headers: [type: :map, default: %{}],
      body: [type: :string, default: ""]
    ]

  def run(params, _context) do
    with :ok <- validate_url(params.url),
         :ok <- validate_headers(params.headers),
         {:ok, response} <- make_request(params) do
      {:ok, response}
    end
  end

  defp validate_url(url) do
    case URI.parse(url) do
      %URI{scheme: scheme, host: host} when scheme in ["http", "https"] and not is_nil(host) ->
        if allowed_host?(host) do
          :ok
        else
          {:error, Jido.Action.Error.validation_error(
            "Host not allowed",
            %{host: host}
          )}
        end
      
      _ ->
        {:error, Jido.Action.Error.validation_error("Invalid URL format")}
    end
  end

  defp allowed_host?(host) do
    # Block internal/private addresses
    blocked_patterns = [
      ~r/^localhost$/i,
      ~r/^127\./,
      ~r/^10\./,
      ~r/^172\.(1[6-9]|2[0-9]|3[01])\./,
      ~r/^192\.168\./,
      ~r/^169\.254\./  # Link-local
    ]
    
    # Allow specific external hosts
    allowed_hosts = [
      "api.example.com",
      "secure-service.com"
    ]
    
    host in allowed_hosts and 
      not Enum.any?(blocked_patterns, &Regex.match?(&1, host))
  end

  defp validate_headers(headers) do
    # Prevent header injection
    dangerous_headers = ["host", "authorization", "cookie"]
    
    if Enum.any?(headers, fn {key, _} -> 
      String.downcase(key) in dangerous_headers 
    end) do
      {:error, Jido.Action.Error.validation_error("Dangerous headers not allowed")}
    else
      :ok
    end
  end

  defp make_request(params) do
    # Make request with security settings
    options = [
      timeout: 5_000,
      max_redirects: 3,
      # Disable dangerous features
      follow_redirects: false,
      ssl_verify: :verify_peer
    ]
    
    HTTPoison.request(
      params.method,
      params.url,
      params.body,
      params.headers,
      options
    )
  end
end

Access Control

Action Allowlisting

Use Jido.Instruction.validate_allowed_actions/2 to restrict which actions can be executed:

alias Jido.Instruction

# Create instructions
instructions = [
  Instruction.new(MyApp.Actions.SafeAction, %{data: "value"}),
  Instruction.new(MyApp.Actions.AnotherSafeAction, %{id: 123})
]

# Define allowed actions (whitelist)
allowed_actions = [
  MyApp.Actions.SafeAction,
  MyApp.Actions.AnotherSafeAction,
  MyApp.Actions.ThirdAction
]

# Validate before execution
case Instruction.validate_allowed_actions(instructions, allowed_actions) do
  :ok ->
    # All actions are allowed, proceed with execution
    Jido.Exec.run(hd(instructions).action, hd(instructions).params, context)
    
  {:error, error} ->
    # Some actions are not in the allowlist
    Logger.warning("Blocked unauthorized actions", error: error)
    {:error, error}
end

This is particularly useful for AI tool execution where you want to restrict which actions an LLM can invoke.

Context-Based Authorization

defmodule MyApp.Actions.AuthorizedAction do
  use Jido.Action,
    schema: [
      resource_id: [type: :string, required: true],
      action: [type: :atom, in: [:read, :write, :delete], required: true]
    ]

  def run(params, context) do
    with :ok <- authenticate_user(context),
         :ok <- authorize_action(context, params) do
      perform_action(params, context)
    end
  end

  defp authenticate_user(context) do
    case Map.get(context, :user_id) do
      nil -> 
        {:error, Jido.Action.Error.execution_error("Authentication required")}
      user_id when is_binary(user_id) -> 
        :ok
      _ -> 
        {:error, Jido.Action.Error.execution_error("Invalid authentication")}
    end
  end

  defp authorize_action(context, params) do
    user_id = context.user_id
    resource_id = params.resource_id
    action = params.action
    
    if has_permission?(user_id, resource_id, action) do
      :ok
    else
      {:error, Jido.Action.Error.execution_error(
        "Insufficient permissions",
        %{user_id: user_id, resource_id: resource_id, action: action}
      )}
    end
  end

  defp has_permission?(user_id, resource_id, action) do
    # Check permissions in database/cache
    MyApp.Permissions.check(user_id, resource_id, action)
  end
end

Role-Based Access Control

defmodule MyApp.Security.RBAC do
  @admin_actions [:create_user, :delete_user, :view_all_data]
  @moderator_actions [:edit_content, :ban_user, :view_reports]
  @user_actions [:view_profile, :edit_own_profile, :create_content]

  def authorize(user_role, action) do
    allowed_actions = case user_role do
      :admin -> @admin_actions ++ @moderator_actions ++ @user_actions
      :moderator -> @moderator_actions ++ @user_actions
      :user -> @user_actions
      _ -> []
    end
    
    if action in allowed_actions do
      :ok
    else
      {:error, "Action #{action} not allowed for role #{user_role}"}
    end
  end
end

defmodule MyApp.Actions.RBACAction do
  use Jido.Action,
    schema: [data: [type: :any, required: true]]

  # Define required role for this action
  @required_role :moderator

  def run(params, context) do
    with :ok <- check_role_authorization(context) do
      perform_moderation_action(params.data)
    end
  end

  defp check_role_authorization(context) do
    user_role = Map.get(context, :user_role, :guest)
    MyApp.Security.RBAC.authorize(user_role, @required_role)
  end
end

Secrets Management

Environment Variable Handling

defmodule MyApp.Secrets do
  @doc "Get secret with validation"
  def get_secret!(key) do
    case System.get_env(key) do
      nil -> 
        raise "Missing required secret: #{key}"
      "" -> 
        raise "Empty secret: #{key}"
      secret when byte_size(secret) < 8 ->
        raise "Secret too short: #{key}"
      secret -> 
        secret
    end
  end

  @doc "Get optional secret with default"
  def get_secret(key, default \\ nil) do
    case System.get_env(key) do
      nil -> default
      "" -> default
      secret -> secret
    end
  end
end

defmodule MyApp.Actions.UseSecrets do
  use Jido.Action,
    schema: [operation: [type: :string, required: true]]

  def run(params, _context) do
    # Get secrets securely
    api_key = MyApp.Secrets.get_secret!("API_KEY")
    endpoint = MyApp.Secrets.get_secret("API_ENDPOINT", "https://api.default.com")
    
    # Use secrets in action
    perform_api_operation(params.operation, api_key, endpoint)
  end

  # Never log secrets
  defp perform_api_operation(operation, api_key, endpoint) do
    Logger.info("Performing API operation", operation: operation, endpoint: endpoint)
    # Note: api_key is NOT logged
    
    # Implementation...
    {:ok, %{status: "completed"}}
  end
end

Secret Rotation

defmodule MyApp.SecretRotation do
  @doc "Support multiple API keys for rotation"
  def get_active_api_key do
    # Try primary key first
    case MyApp.Secrets.get_secret("API_KEY_PRIMARY") do
      nil -> 
        # Fallback to secondary during rotation
        MyApp.Secrets.get_secret("API_KEY_SECONDARY")
      key -> 
        key
    end
  end

  def health_check do
    keys = [
      MyApp.Secrets.get_secret("API_KEY_PRIMARY"),
      MyApp.Secrets.get_secret("API_KEY_SECONDARY")
    ]
    
    valid_keys = Enum.count(keys, &(&1 != nil))
    
    case valid_keys do
      0 -> {:error, "No valid API keys"}
      1 -> {:warning, "Only one API key configured"}
      _ -> {:ok, "Multiple API keys available"}
    end
  end
end

Audit Logging

Security Event Logging

defmodule MyApp.Security.AuditLog do
  require Logger

  def log_security_event(event_type, details, context) do
    Logger.warning("Security event",
      event_type: event_type,
      details: sanitize_for_logging(details),
      user_id: Map.get(context, :user_id),
      ip_address: Map.get(context, :ip_address),
      timestamp: DateTime.utc_now(),
      session_id: Map.get(context, :session_id)
    )
    
    # Also send to security monitoring system
    send_to_security_system(event_type, details, context)
  end

  defp sanitize_for_logging(details) do
    # Remove sensitive data from logs
    Map.drop(details, [:password, :token, :api_key, :secret])
  end

  defp send_to_security_system(event_type, details, context) do
    # Send to external security monitoring
    # Implementation depends on your monitoring system
    :ok
  end
end

defmodule MyApp.Actions.AuditedAction do
  use Jido.Action

  def run(params, context) do
    # Log action start
    MyApp.Security.AuditLog.log_security_event(
      :action_started,
      %{action: __MODULE__, params: sanitize_params(params)},
      context
    )
    
    result = perform_action(params, context)
    
    # Log action completion
    MyApp.Security.AuditLog.log_security_event(
      :action_completed,
      %{action: __MODULE__, result: :success},
      context
    )
    
    result
  rescue
    exception ->
      # Log action failure
      MyApp.Security.AuditLog.log_security_event(
        :action_failed,
        %{action: __MODULE__, error: exception.message},
        context
      )
      
      reraise exception, __STACKTRACE__
  end

  defp sanitize_params(params) do
    # Remove sensitive parameters from audit logs
    Map.drop(params, [:password, :credit_card, :ssn])
  end
end

Security Testing

Security Test Patterns

defmodule MyApp.Actions.SecurityTest do
  use ExUnit.Case

  describe "input validation security" do
    test "rejects SQL injection attempts" do
      malicious_input = "'; DROP TABLE users; --"
      
      assert {:error, error} = MyApp.Actions.ProcessInput.run(
        %{query: malicious_input},
        %{}
      )
      
      assert error.type == :validation_error
    end

    test "rejects oversized inputs" do
      large_input = String.duplicate("A", 2_000_000)  # 2MB
      
      assert {:error, error} = MyApp.Actions.ProcessInput.run(
        %{data: large_input},
        %{}
      )
      
      assert error.message =~ "too large"
    end

    test "sanitizes dangerous content" do
      dangerous_content = "<script>alert('xss')</script>"
      
      assert {:ok, result} = MyApp.Actions.SanitizeInput.run(
        %{content: dangerous_content},
        %{}
      )
      
      refute String.contains?(result.content, "<script>")
    end
  end

  describe "authorization security" do
    test "requires authentication" do
      assert {:error, error} = MyApp.Actions.AuthorizedAction.run(
        %{resource_id: "res_123", action: :read},
        %{}  # No user context
      )
      
      assert error.message =~ "Authentication required"
    end

    test "enforces role-based access" do
      user_context = %{user_id: "user_123", user_role: :user}
      
      assert {:error, error} = MyApp.Actions.AdminOnlyAction.run(
        %{operation: :delete_all},
        user_context
      )
      
      assert error.message =~ "Insufficient permissions"
    end
  end
end

Built-in Security Features

Jido Action provides these security features out of the box:

FeatureImplementation
Parameter validationNimbleOptions/Zoi schemas in action definitions
Action allowlistingJido.Instruction.validate_allowed_actions/2
Timeout protectiontimeout option in Jido.Exec.run/4 and Jido.Exec.await/2
Lifecycle hookson_before_validate_params/1 for input sanitization
Error typesStructured error types (InvalidInputError, TimeoutError, ExecutionFailureError, etc.)

Best Practices

The patterns shown in this guide are recommended implementations—not built-in configuration options.

Defense in Depth

  • Input Validation: Use schema validation plus custom validation in run/2
  • Output Encoding: Encode outputs for target context in your action logic
  • Principle of Least Privilege: Use validate_allowed_actions/2 to whitelist actions
  • Fail Securely: Return {:error, ...} tuples; don't silently continue

Secret Management

  • Environment Variables: Store secrets in environment variables
  • No Hard-coding: Never hard-code secrets in source code
  • Rotation: Support secret rotation without downtime
  • Logging: Never log secrets or sensitive data

Access Control

  • Action Allowlisting: Use Jido.Instruction.validate_allowed_actions/2
  • Context-Based Auth: Pass user/session info via context parameter
  • Role Checks: Implement RBAC in your action's run/2 function
  • Audit Logging: Log security events using Elixir's Logger

Resource Protection

  • Timeout Enforcement: Use timeout option in Jido.Exec.run/4
  • Input Limits: Validate input size in schemas or on_before_validate_params/1
  • Rate Limiting: Implement at the application/infrastructure layer
  • Memory Monitoring: Use BEAM observability tools (:erlang.memory/1, etc.)

Next Steps

Testing Guide - Security testing strategies
Configuration Guide - Security configuration
FAQ - Common security questions


Configuration Guide | Next: Testing Guide