Choreo ThreatModel: Comprehensive Walkthrough

Copy Markdown View Source
# For Hex publication/readers:
# Mix.install([{:choreo, "~> 0.9"}, {:kino_vizjs, "~> 0.8.0"}])

# For local development:
Mix.install([
  {:choreo, path: Path.expand("~/repos/elixir/choreo")},
  {:kino_vizjs, "~> 0.8.0"}
])

Section

Rendering diagrams: This livebook uses Kino.VizJS to render DOT diagrams inline. It also supports rendering to native Mermaid.js syntax using to_mermaid/2 via Kino.Mermaid, which is supported natively in Livebook!


What is Threat Modeling?

Threat modeling is a structured way to identify, quantify, and address security risks in a system. Instead of waiting for a penetration test to reveal vulnerabilities, you analyze the architecture before writing code and ask: "What could go wrong?"

STRIDE is a popular mnemonic for threat categories:

CategoryQuestionExample
SpoofingCan an attacker pretend to be someone else?Stolen credentials, forged tokens
TamperingCan data be modified in transit or at rest?Man-in-the-middle, SQL injection
RepudiationCan actions be denied?Missing audit logs
Information DisclosureIs sensitive data exposed?Unencrypted backups, verbose errors
Denial of ServiceCan the system be overloaded?DDoS, resource exhaustion
Elevation of PrivilegeCan a user gain more access?Horizontal/vertical privilege escalation

Choreo.ThreatModel lets you describe your architecture as code, define trust boundaries, and automatically generate STRIDE threats with severity scoring.


Example 1: Classic Three-Tier Web Application

Let's start with the simplest useful model: a user, a web API, and a database. This mirrors the classic pytm getting-started example.

alias Choreo.ThreatModel
alias Choreo.ThreatModel.Analysis

web_app =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("internet", level: 0, label: "Internet")
  |> ThreatModel.add_trust_boundary("app", level: 2, label: "Application Zone")
  |> ThreatModel.add_trust_boundary("data", level: 3, label: "Data Layer")
  |> ThreatModel.add_external_entity(:user,
    label: "Customer",
    boundary: "internet",
    description: "End user browsing the site"
  )
  |> ThreatModel.add_process(:web_api,
    label: "Web API",
    boundary: "app",
    privilege: :user,
    description: "Handles HTTP requests"
  )
  |> ThreatModel.add_data_store(:postgres,
    label: "Postgres",
    boundary: "data",
    sensitivity: :confidential,
    description: "Primary application database"
  )
  |> ThreatModel.data_flow(:user, :web_api,
    label: "HTTPS login",
    encrypted: true
  )
  |> ThreatModel.data_flow(:web_api, :postgres,
    label: "SQL query"
  )

tabs = [
  {"Mermaid", Kino.Mermaid.new(ThreatModel.to_mermaid(web_app))},
  {"Graphviz", Kino.VizJS.render(ThreatModel.to_dot(web_app))}
]

Kino.Layout.tabs(tabs)

Notice the color coding:

  • Red dashed edges cross trust boundaries unencrypted.
  • Orange edges cross boundaries but are encrypted.
  • Green nodes live in higher-trust zones.

Generating STRIDE Threats

threats = Analysis.stride_threats(web_app)

threats
|> Enum.sort_by(& &1.severity, :desc)
|> Enum.each(fn t ->
  target = if is_tuple(t.target), do: "#{elem(t.target, 0)}#{elem(t.target, 1)}", else: t.target
  IO.puts("[#{String.upcase(to_string(t.severity))}] #{t.id}#{t.category} @ #{target}")
  IO.puts("  #{t.description}")
  IO.puts("  Mitigation: #{t.mitigation}\n")
end)

The model automatically flagged the unencrypted web_api → postgres flow as high-risk information disclosure because it crosses from the application zone into the data layer without encryption.

Validation

Analysis.validate(web_app)
|> Enum.each(fn {sev, msg} ->
  icon = if sev == :error, do: "❌", else: "⚠️"
  IO.puts("#{icon} #{msg}")
end)

Example 2: Microservices with an API Gateway

A more realistic architecture: an API Gateway fronts multiple services. Some flows are internal; some cross from the public internet into the VPC.

microservices =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("internet", level: 0)
  |> ThreatModel.add_trust_boundary("dmz", level: 1)
  |> ThreatModel.add_trust_boundary("vpc", level: 2)
  |> ThreatModel.add_external_entity(:mobile_app, label: "Mobile App", boundary: "internet")
  |> ThreatModel.add_external_entity(:spa, label: "Web SPA", boundary: "internet")
  |> ThreatModel.add_process(:api_gateway,
    label: "API Gateway",
    boundary: "dmz",
    privilege: :none,
    description: "Rate limiting, routing, WAF"
  )
  |> ThreatModel.add_process(:auth_service,
    label: "Auth Service",
    boundary: "vpc",
    privilege: :admin,
    description: "Issues and validates JWTs"
  )
  |> ThreatModel.add_process(:order_service,
    label: "Order Service",
    boundary: "vpc",
    privilege: :user
  )
  |> ThreatModel.add_process(:payment_webhook,
    label: "Payment Webhook",
    boundary: "vpc",
    privilege: :none,
    description: "Receives async callbacks from Stripe"
  )
  |> ThreatModel.add_data_store(:orders_db,
    label: "Orders DB",
    boundary: "vpc",
    sensitivity: :confidential
  )
  |> ThreatModel.add_data_store(:redis_cache,
    label: "Redis",
    boundary: "vpc",
    sensitivity: :internal
  )
  |> ThreatModel.data_flow(:mobile_app, :api_gateway, label: "REST + mTLS", encrypted: true)
  |> ThreatModel.data_flow(:spa, :api_gateway, label: "HTTPS", encrypted: true)
  |> ThreatModel.data_flow(:api_gateway, :auth_service, label: "gRPC", encrypted: true)
  |> ThreatModel.data_flow(:api_gateway, :order_service, label: "gRPC", encrypted: true)
  |> ThreatModel.data_flow(:order_service, :orders_db, label: "SQL/TLS", encrypted: true)
  |> ThreatModel.data_flow(:order_service, :redis_cache, label: "Redis protocol")
  |> ThreatModel.data_flow(:payment_webhook, :order_service, label: "Event bus")

tabs = [
  {"Mermaid", Kino.Mermaid.new(ThreatModel.to_mermaid(microservices))},
  {"Graphviz", Kino.VizJS.render(ThreatModel.to_dot(microservices))}
]

Kino.Layout.tabs(tabs)

Cross-Boundary Analysis

Which flows cross a trust boundary without encryption?

Analysis.unencrypted_boundary_flows(microservices)
|> Enum.each(fn {from, to} ->
  IO.puts("🔓 Unencrypted boundary crossing: #{from}#{to}")
end)

Exposed Data Stores

Which databases are reachable from an external entity?

Analysis.exposed_data_stores(microservices)
|> IO.inspect(label: "Exposed data stores")

High-Risk Processes

Which processes touch sensitive data?

Analysis.high_risk_processes(microservices)
|> IO.inspect(label: "High-risk processes")

Attack Paths

What are the complete paths from an external entry point to a data store?

Analysis.attack_paths(microservices)
|> Enum.each(fn path ->
  IO.puts("🎯 #{Enum.join(path, " → ")}")
end)

Threat Summary

summary = Analysis.threat_summary(microservices)

IO.puts("Total threats: #{summary.total}")
IO.puts("\nBy severity:")
Enum.each(summary.by_severity, fn {sev, count} ->
  IO.puts("  #{sev}: #{count}")
end)

Example 3: Multi-Tenant SaaS with Custom Compliance Rules

Real organizations have compliance requirements beyond STRIDE. The :rules option lets you inject custom threat generators via the Choreo.ThreatModel.Analysis.Rule behaviour.

defmodule GDPRRule do
  @behaviour Analysis.Rule

  @impl true
  def threats_for_element(_model, id, data) do
    sensitivity = data[:sensitivity]

    if sensitivity in [:confidential, :restricted] do
      [
        %{
          id: "GDPR-#{id}-1",
          category: :compliance,
          target: id,
          description: "#{data.label} stores personal data without documented retention policy.",
          severity: :high,
          mitigation: "Implement automated data purging and Right-to-be-Forgotten endpoints."
        }
      ]
    else
      []
    end
  end

  @impl true
  def threats_for_flow(_model, from, to, meta) do
    # Cross-boundary flows with personal data that are unencrypted need DPA review
    if meta[:encrypted] do
      []
    else
      [
        %{
          id: "GDPR-FLOW-#{from}-#{to}",
          category: :compliance,
          target: {from, to},
          description: "Unencrypted data flow #{from}#{to} may require Data Processing Agreement.",
          severity: :high,
          mitigation: "Review DPA coverage and SCCs for third-party processors."
        }
      ]
    end
  end
end
saas =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("public", level: 0)
  |> ThreatModel.add_trust_boundary("tenant_isolation", level: 2)
  |> ThreatModel.add_external_entity(:tenant_admin, label: "Tenant Admin", boundary: "public")
  |> ThreatModel.add_process(:app_server,
    label: "App Server",
    boundary: "tenant_isolation",
    privilege: :user
  )
  |> ThreatModel.add_data_store(:tenant_db,
    label: "Tenant DB",
    boundary: "tenant_isolation",
    sensitivity: :confidential,
    retention: "90d"
  )
  |> ThreatModel.data_flow(:tenant_admin, :app_server, encrypted: true)
  |> ThreatModel.data_flow(:app_server, :tenant_db, encrypted: true)

# Base STRIDE only
base_threats = Analysis.stride_threats(saas)
IO.puts("Base STRIDE threats: #{length(base_threats)}")

# STRIDE + GDPR custom rules
all_threats = Analysis.stride_threats(saas, rules: [GDPRRule])
IO.puts("With GDPR rules: #{length(all_threats)}")

gdpr_only = Enum.filter(all_threats, &(&1.category == :compliance))
IO.puts("GDPR-specific threats:")
Enum.each(gdpr_only, fn t ->
  target = if is_tuple(t.target), do: "flow", else: t.target
  IO.puts("  #{t.id} @ #{target}#{t.description}")
end)

Kino.VizJS.render(ThreatModel.to_dot(saas))

Example 4: Sequence Diagrams

While Data Flow Diagrams illustrate security boundaries, Sequence Diagrams visualize interactions chronologically. Choreo supports both Mermaid.js and PlantUML formats.

tabs = [
  {"Mermaid", Kino.Mermaid.new(ThreatModel.to_sequence(microservices))},
  {"PlantUML", Kino.Markdown.new("```plantuml\n#{ThreatModel.to_plantuml(microservices)}\n```")}
]

Kino.Layout.tabs(tabs)

You can copy these payloads directly into PlantText, Mermaid Live Editor, or your Confluence macro blocks securely.


Deep Dive: Understanding Severity Scoring

Choreo assigns severity based on element properties and flow context. You don't have to guess — the scoring is derived from the model itself.

Severity rules of thumb:

FactorImpact on Severity
Crosses a trust boundaryBumps up by one level
UnencryptedBumps up by one level
sensitivity: :restrictedCritical
sensitivity: :confidentialHigh
sensitivity: :internalMedium
privilege: :adminHigher impact if compromised
privilege: :userStandard impact
privilege: :noneLower impact
# Experiment: change sensitivity and watch severity shift
restricted_model =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("app", level: 2)
  |> ThreatModel.add_process(:api, boundary: "app", privilege: :admin)
  |> ThreatModel.add_data_store(:vault, boundary: "app", sensitivity: :restricted)
  |> ThreatModel.data_flow(:api, :vault)

Analysis.stride_threats(restricted_model)
|> Enum.filter(&(&1.target == :vault))
|> Enum.each(fn t ->
  IO.puts("#{t.category}#{t.severity}")
end)

Fixing Issues: Validation-Driven Hardening

Start with a sloppy model, then use validate/1 as a checklist to harden it.

sloppy =
  ThreatModel.new()
  |> ThreatModel.add_process(:api, label: "API")
  |> ThreatModel.add_data_store(:db, label: "DB")
  |> ThreatModel.data_flow(:api, :db)

IO.puts("Issues found:")

Analysis.validate(sloppy) |> Enum.each(fn {sev, msg} -> IO.puts(" <#{sev}> #{msg}") end)

Now fix each issue step by step:

hardened =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("app", level: 2)
  |> ThreatModel.add_process(:api, label: "API", boundary: "app", privilege: :user)
  |> ThreatModel.add_data_store(:db, label: "DB", boundary: "app", sensitivity: :confidential)
  |> ThreatModel.data_flow(:api, :db, encrypted: true)

IO.puts("After hardening:")
Analysis.validate(hardened) |> IO.inspect()

Kino.VizJS.render(ThreatModel.to_dot(hardened))

Rendering for Stakeholders

Dark theme for presentations

Kino.VizJS.render(ThreatModel.to_dot(microservices, theme: :dark))

Example 5: Multigraph (Parallel Data Flows)

Sometimes multiple distinct data payloads cross boundaries between the same entities (e.g., an HTTPS request and a WebSocket notification channel).

parallel_flows =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("internet", level: 0)
  |> ThreatModel.add_trust_boundary("app", level: 2)
  |> ThreatModel.add_external_entity(:user, boundary: "internet")
  |> ThreatModel.add_process(:api, boundary: "app")
  |> ThreatModel.data_flow(:user, :api, label: "Payload (HTTPS)", encrypted: true)
  |> ThreatModel.data_flow(:user, :api, label: "Signal (WS)", encrypted: false)

IO.puts("Number of data flows: #{length(ThreatModel.flows(parallel_flows))}")
Kino.VizJS.render(ThreatModel.to_dot(parallel_flows))

Summary

TaskFunction
Model architectureThreatModel.new/1, add_trust_boundary/3, add_external_entity/3, add_process/3, add_data_store/3, data_flow/4
Auto-generate threatsAnalysis.stride_threats/2
Custom rulesImplement Analysis.Rule behaviour, pass via rules: [MyRule]
Find unencrypted flowsAnalysis.unencrypted_boundary_flows/1
Find exposed databasesAnalysis.exposed_data_stores/1
Find risky processesAnalysis.high_risk_processes/1
Attack pathsAnalysis.attack_paths/2
Validate modelAnalysis.validate/1
Summarise threatsAnalysis.threat_summary/1
Render DFDThreatModel.to_dot/2 (themes: :default, :dark, :warm, :forest, :ocean)
Render sequenceThreatModel.to_sequence/2 (Mermaid), ThreatModel.to_plantuml/2

Threat modeling as code means your security review is version-controlled, diffable, and repeatable. Every pull request can include an updated threat model alongside the code changes.