Knowledge Control Plane Guide

Copy Markdown View Source

cmdc_rag_arcana 不接管企业知识库数据库。生产平台应在 Phoenix/Ecto 中维护 租户、权限、文档版本、导入任务和索引状态,再把 ready version 映射到 Arcana。

边界

责任
企业 Phoenix appKnowledge UI、租户、权限、审批、版本、状态、审计
cmdc_rag_arcanaTool / Plugin / status helper / ingestion adapter contract
Arcanaingest/search/ask/pipeline/graph/maintenance 底层能力
Python sidecarOCR、版面解析、表格抽取、特殊 parser fallback

Arcana dashboard 已经提供 Documents、Collections、Search、Ask、Evaluation、 Maintenance、Graph 页面。生产企业平台不应直接把它当业务 Knowledge UI, 推荐只作为 admin/dev 调试入口,并始终加认证和管理员权限。

Ecto Schema 草案

defmodule MyApp.Knowledge.Collection do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_collections" do
    field :tenant_id, :string
    field :name, :string
    field :description, :string
    field :sensitivity_level, :string, default: "internal"
    field :retention, :map, default: %{}
    field :acl, :map, default: %{}
    field :graph_enabled, :boolean, default: false
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(collection, attrs) do
    collection
    |> cast(attrs, [:tenant_id, :name, :description, :sensitivity_level,
                    :retention, :acl, :graph_enabled, :metadata])
    |> validate_required([:tenant_id, :name])
    |> unique_constraint([:tenant_id, :name])
  end
end

defmodule MyApp.Knowledge.Document do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_documents" do
    field :tenant_id, :string
    field :collection, :string
    field :title, :string
    field :source_uri, :string
    field :owner_id, :string
    field :department, :string
    field :active_version_id, :binary_id
    field :sensitivity_level, :string, default: "internal"
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(document, attrs) do
    document
    |> cast(attrs, [:tenant_id, :collection, :title, :source_uri, :owner_id,
                    :department, :active_version_id, :sensitivity_level, :metadata])
    |> validate_required([:tenant_id, :collection, :title])
  end
end

defmodule MyApp.Knowledge.DocumentVersion do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_document_versions" do
    field :tenant_id, :string
    field :document_id, :binary_id
    field :version, :string
    field :checksum, :string
    field :source_uri, :string
    field :arcana_document_id, :binary_id
    field :status, Ecto.Enum,
      values: [:draft, :queued, :indexing, :ready, :failed, :archived],
      default: :draft
    field :stale_at, :utc_datetime_usec
    field :parser, :string
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(version, attrs) do
    version
    |> cast(attrs, [:tenant_id, :document_id, :version, :checksum, :source_uri,
                    :arcana_document_id, :status, :stale_at, :parser, :metadata])
    |> validate_required([:tenant_id, :document_id, :version, :checksum])
    |> unique_constraint([:tenant_id, :document_id, :checksum])
  end
end

defmodule MyApp.Knowledge.IngestionRun do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}
  schema "knowledge_ingestion_runs" do
    field :tenant_id, :string
    field :document_version_id, :binary_id
    field :kind, :string, default: "ingest"
    field :status, Ecto.Enum,
      values: [:queued, :running, :completed, :failed, :cancelled],
      default: :queued
    field :attempt, :integer, default: 0
    field :started_at, :utc_datetime_usec
    field :finished_at, :utc_datetime_usec
    field :error, :string
    field :progress, :map, default: %{}
    field :metadata, :map, default: %{}

    timestamps(type: :utc_datetime_usec)
  end

  def changeset(run, attrs) do
    run
    |> cast(attrs, [:tenant_id, :document_version_id, :kind, :status, :attempt,
                    :started_at, :finished_at, :error, :progress, :metadata])
    |> validate_required([:tenant_id, :document_version_id])
  end
end

SourceMapping 可以做独立表,也可以放在 DocumentVersion 上:

schema "knowledge_source_mappings" do
  field :tenant_id, :string
  field :document_id, :binary_id
  field :version_id, :binary_id
  field :collection, :string
  field :arcana_document_id, :binary_id
  field :source_id, :string
  field :source_uri, :string
  field :metadata, :map, default: %{}

  timestamps(type: :utc_datetime_usec)
end

Oban Worker Skeleton

defmodule MyApp.Workers.KnowledgeIngestionWorker do
  use Oban.Worker, queue: :knowledge, max_attempts: 5

  alias CMDCRAGArcana.Ingestion
  alias CMDCRAGArcana.Ingestion.JobSpec

  @impl true
  def perform(%Oban.Job{args: args}) do
    version = MyApp.Knowledge.get_version!(args["version_id"])

    spec =
      JobSpec.new!(
        tenant_id: version.tenant_id,
        collection: version.collection,
        document_id: version.document_id,
        version_id: version.id,
        checksum: version.checksum,
        source_uri: version.source_uri,
        file_path: args["file_path"],
        graph?: version.graph_enabled?,
        preprocessor: MyApp.Knowledge.ParserSidecar
      )

    Ingestion.run(spec,
      repo: MyApp.Repo,
      progress: fn event, payload ->
        MyApp.Knowledge.record_ingestion_progress(version.id, event, payload)
      end
    )
  end
end

同一个 document_id + checksum 应幂等。重建索引时先生成新 version,待 status == :ready 后再切换 Document.active_version_id。旧 active version 在切换前继续服务线上查询。

Parser / OCR Artifact Contract

Arcana 内置 parser 主要覆盖 txt/md/pdf 文本抽取。企业合同、扫描件、设备手册、 制度附件如果需要 OCR、版面解析、表格抽取,推荐由 Python sidecar 输出 CMDCRAGArcana.Ingestion.ParsedDocument:

%CMDCRAGArcana.Ingestion.ParsedDocument{
  text: "制度正文...",
  content_type: "application/pdf",
  checksum: "sha256:...",
  source_uri: "kb://policies/approval.pdf",
  pages: [
    %CMDCRAGArcana.Ingestion.ParsedPage{
      page_number: 3,
      text: "高风险操作需要审批",
      section: "审批制度",
      bbox: %{x: 10, y: 20, width: 300, height: 80},
      char_start: 1200,
      char_end: 1234
    }
  ],
  tables: [
    %CMDCRAGArcana.Ingestion.ParsedTable{
      id: "tbl-approval",
      page_number: 3,
      title: "审批矩阵",
      markdown: "| 风险 | 审批 |\n| L3 | 经理审批 |"
    }
  ],
  metadata: %{parser: "python-layout-v1"}
}

Ingestion.run/2 会把 artifact 归一化为:

  • Arcana ingest text
  • content_type
  • checksum
  • source_uri
  • pages
  • tables
  • source_map
  • parser_metadata

这些字段进入 Arcana document metadata,供企业 Knowledge UI 和 citation 渲染使用。

Citation Span

CMDCRAGArcana.Citationspan 字段用于页码、段落、表格和 bbox 级引用:

{
  "document_id": "doc-1",
  "source_uri": "kb://policies/approval.pdf",
  "span": {
    "page_number": 3,
    "section": "审批制度",
    "paragraph": "p-7",
    "table_id": "tbl-approval",
    "bbox": {"x": 10, "y": 20, "width": 300, "height": 80},
    "char_start": 1200,
    "char_end": 1234
  }
}

CitationAudit 默认仍会移除 citation text,但保留 span provenance,方便审计 “引用了哪里”而不是泄露整段内容。

Status Backend

rag_ingest_status 通过 CMDCRAGArcana.Knowledge.StatusBackend 查询企业状态:

defmodule MyApp.Knowledge.RAGStatusBackend do
  @behaviour CMDCRAGArcana.Knowledge.StatusBackend

  @impl true
  def get_status(opts) do
    {:ok,
     %CMDCRAGArcana.Knowledge.IndexStatus{
       tenant_id: opts[:tenant_id],
       collection: opts[:collection],
       document_id: opts[:document_id],
       version_id: opts[:version_id],
       active_version_id: "...",
       status: :ready,
       graph_status: :stale,
       chunk_count: 120,
       stale?: false
     }}
  end
end

Agent 配置:

user_data: %{
  tenant_id: "tenant-a",
  cmdc_rag_arcana: %{
    allowed_collections: ["policies"],
    status_backend: MyApp.Knowledge.RAGStatusBackend
  }
}

Maintenance

企业平台应从后台任务调用 maintenance wrapper:

CMDCRAGArcana.Maintenance.reembed(MyApp.Repo,
  tenant_id: "tenant-a",
  collection: "policies",
  progress: fn current, total ->
    MyApp.Knowledge.update_reembed_progress("policies", current, total)
  end
)

该 wrapper 会发:

  • [:cmdc_rag_arcana, :maintenance, :reembed, :start | :stop | :exception]

  • [:cmdc_rag_arcana, :maintenance, :reembed, :progress]
  • 可选 CMDC EventBus {:plugin_event, :rag_reembed_progress, payload}

progress payload 统一使用 CMDCRAGArcana.ProgressEvent 字段:

%{
  kind: :reembed,
  event: :progress,
  status: :running,
  tenant_id: "tenant-a",
  collection: "policies",
  current: 10,
  total: 100,
  percent: 10.0,
  occurred_at_ms: 1_779_999_000_000
}

同一结构也用于 :rag_ingestion_progress 和后续 :rag_graph_progress