Knowledge Control Plane Guide

Copy Markdown View Source

cmdc_rag_arcana 不接管企业知识库数据库。生产平台应在 Phoenix/Ecto 中维护 租户、权限、文档版本、导入任务和索引状态,再把 ready version 映射到 Arcana。

边界

责任
企业 Phoenix appKnowledge UI、租户、权限、审批、版本、状态、审计
cmdc_rag_arcanaTool / Plugin / status helper / ingestion adapter contract
Arcanaingest/search/ask/pipeline/graph/maintenance 底层能力
Python sidecarOCR、版面解析、表格抽取、特殊 parser fallback

Arcana dashboard 已经提供 Documents、Collections、Search、Ask、Evaluation、 Maintenance、Graph 页面。生产企业平台不应直接把它当业务 Knowledge UI, 推荐只作为 admin/dev 调试入口,并始终加认证和管理员权限。

Ecto Schema 草案

defmodule MyApp.Knowledge.Collection do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_collections" do
    field :tenant_id, :string
    field :name, :string
    field :description, :string
    field :sensitivity_level, :string, default: "internal"
    field :retention, :map, default: %{}
    field :acl, :map, default: %{}
    field :graph_enabled, :boolean, default: false
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(collection, attrs) do
    collection
    |> cast(attrs, [:tenant_id, :name, :description, :sensitivity_level,
                    :retention, :acl, :graph_enabled, :metadata])
    |> validate_required([:tenant_id, :name])
    |> unique_constraint([:tenant_id, :name])
  end
end

defmodule MyApp.Knowledge.Document do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_documents" do
    field :tenant_id, :string
    field :collection, :string
    field :title, :string
    field :source_uri, :string
    field :owner_id, :string
    field :department, :string
    field :active_version_id, :binary_id
    field :sensitivity_level, :string, default: "internal"
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(document, attrs) do
    document
    |> cast(attrs, [:tenant_id, :collection, :title, :source_uri, :owner_id,
                    :department, :active_version_id, :sensitivity_level, :metadata])
    |> validate_required([:tenant_id, :collection, :title])
  end
end

defmodule MyApp.Knowledge.DocumentVersion do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_document_versions" do
    field :tenant_id, :string
    field :document_id, :binary_id
    field :version, :string
    field :checksum, :string
    field :source_uri, :string
    field :arcana_document_id, :binary_id
    field :status, Ecto.Enum,
      values: [:draft, :queued, :indexing, :ready, :failed, :archived],
      default: :draft
    field :stale_at, :utc_datetime_usec
    field :parser, :string
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(version, attrs) do
    version
    |> cast(attrs, [:tenant_id, :document_id, :version, :checksum, :source_uri,
                    :arcana_document_id, :status, :stale_at, :parser, :metadata])
    |> validate_required([:tenant_id, :document_id, :version, :checksum])
    |> unique_constraint([:tenant_id, :document_id, :checksum])
  end
end

defmodule MyApp.Knowledge.IngestionRun do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_ingestion_runs" do
    field :tenant_id, :string
    field :document_version_id, :binary_id
    field :kind, :string, default: "ingest"
    field :status, Ecto.Enum,
      values: [:queued, :running, :completed, :failed, :cancelled],
      default: :queued
    field :attempt, :integer, default: 0
    field :started_at, :utc_datetime_usec
    field :finished_at, :utc_datetime_usec
    field :error, :string
    field :progress, :map, default: %{}
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(run, attrs) do
    run
    |> cast(attrs, [:tenant_id, :document_version_id, :kind, :status, :attempt,
                    :started_at, :finished_at, :error, :progress, :metadata])
    |> validate_required([:tenant_id, :document_version_id])
  end
end

SourceMapping 可以做独立表,也可以放在 DocumentVersion 上:

schema "knowledge_source_mappings" do
  field :tenant_id, :string
  field :document_id, :binary_id
  field :version_id, :binary_id
  field :collection, :string
  field :arcana_document_id, :binary_id
  field :source_id, :string
  field :source_uri, :string
  field :metadata, :map, default: %{}

  timestamps(type: :utc_datetime_usec)
end

Oban Worker Skeleton

defmodule MyApp.Workers.KnowledgeIngestionWorker do
  use Oban.Worker, queue: :knowledge, max_attempts: 5

  alias CMDCRAGArcana.Ingestion
  alias CMDCRAGArcana.Ingestion.JobSpec

  @impl true
  def perform(%Oban.Job{args: args}) do
    version = MyApp.Knowledge.get_version!(args["version_id"])

    spec =
      JobSpec.new!(
        tenant_id: version.tenant_id,
        collection: version.collection,
        document_id: version.document_id,
        version_id: version.id,
        checksum: version.checksum,
        source_uri: version.source_uri,
        file_path: args["file_path"],
        graph?: version.graph_enabled?,
        preprocessor: MyApp.Knowledge.ParserSidecar
      )

    Ingestion.run(spec,
      repo: MyApp.Repo,
      progress: fn event, payload ->
        MyApp.Knowledge.record_ingestion_progress(version.id, event, payload)
      end
    )
  end
end

同一个 document_id + checksum 应幂等。重建索引时先生成新 version,待 status == :ready 后再切换 Document.active_version_id。旧 active version 在切换前继续服务线上查询。

Status Backend

rag_ingest_status 通过 CMDCRAGArcana.Knowledge.StatusBackend 查询企业状态:

defmodule MyApp.Knowledge.RAGStatusBackend do
  @behaviour CMDCRAGArcana.Knowledge.StatusBackend

  @impl true
  def get_status(opts) do
    {:ok,
     %CMDCRAGArcana.Knowledge.IndexStatus{
       tenant_id: opts[:tenant_id],
       collection: opts[:collection],
       document_id: opts[:document_id],
       version_id: opts[:version_id],
       active_version_id: "...",
       status: :ready,
       graph_status: :stale,
       chunk_count: 120,
       stale?: false
     }}
  end
end

Agent 配置:

user_data: %{
  tenant_id: "tenant-a",
  cmdc_rag_arcana: %{
    allowed_collections: ["policies"],
    status_backend: MyApp.Knowledge.RAGStatusBackend
  }
}

Maintenance

企业平台应从后台任务调用 maintenance wrapper:

CMDCRAGArcana.Maintenance.reembed(MyApp.Repo,
  tenant_id: "tenant-a",
  collection: "policies",
  progress: fn current, total ->
    MyApp.Knowledge.update_reembed_progress("policies", current, total)
  end
)

该 wrapper 会发:

  • [:cmdc_rag_arcana, :maintenance, :reembed, :start | :stop | :exception]

  • [:cmdc_rag_arcana, :maintenance, :reembed, :progress]
  • 可选 CMDC EventBus {:plugin_event, :rag_reembed_progress, payload}