cmdc_rag_arcana 不接管企业知识库数据库。生产平台应在 Phoenix/Ecto 中维护
租户、权限、文档版本、导入任务和索引状态,再把 ready version 映射到 Arcana。
边界
| 层 | 责任 |
|---|---|
| 企业 Phoenix app | Knowledge UI、租户、权限、审批、版本、状态、审计 |
cmdc_rag_arcana | Tool / Plugin / status helper / ingestion adapter contract |
| Arcana | ingest/search/ask/pipeline/graph/maintenance 底层能力 |
| 企业解析/离线导入 | OCR、版面解析、表格抽取、特殊 parser fallback artifact |
Arcana dashboard 已经提供 Documents、Collections、Search、Ask、Evaluation、 Maintenance、Graph 页面。生产企业平台不应直接把它当业务 Knowledge UI, 推荐只作为 admin/dev 调试入口,并始终加认证和管理员权限。
Ecto Schema 草案
defmodule MyApp.Knowledge.Collection do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_collections" do
field :tenant_id, :string
field :name, :string
field :description, :string
field :sensitivity_level, :string, default: "internal"
field :retention, :map, default: %{}
field :acl, :map, default: %{}
field :graph_enabled, :boolean, default: false
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(collection, attrs) do
collection
|> cast(attrs, [:tenant_id, :name, :description, :sensitivity_level,
:retention, :acl, :graph_enabled, :metadata])
|> validate_required([:tenant_id, :name])
|> unique_constraint([:tenant_id, :name])
end
end
defmodule MyApp.Knowledge.Document do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_documents" do
field :tenant_id, :string
field :collection, :string
field :title, :string
field :source_uri, :string
field :owner_id, :string
field :department, :string
field :active_version_id, :binary_id
field :sensitivity_level, :string, default: "internal"
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(document, attrs) do
document
|> cast(attrs, [:tenant_id, :collection, :title, :source_uri, :owner_id,
:department, :active_version_id, :sensitivity_level, :metadata])
|> validate_required([:tenant_id, :collection, :title])
end
end
defmodule MyApp.Knowledge.DocumentVersion do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_document_versions" do
field :tenant_id, :string
field :document_id, :binary_id
field :version, :string
field :checksum, :string
field :source_uri, :string
field :arcana_document_id, :binary_id
field :status, Ecto.Enum,
values: [:draft, :queued, :indexing, :ready, :failed, :archived],
default: :draft
field :stale_at, :utc_datetime_usec
field :parser, :string
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(version, attrs) do
version
|> cast(attrs, [:tenant_id, :document_id, :version, :checksum, :source_uri,
:arcana_document_id, :status, :stale_at, :parser, :metadata])
|> validate_required([:tenant_id, :document_id, :version, :checksum])
|> unique_constraint([:tenant_id, :document_id, :checksum])
end
end
defmodule MyApp.Knowledge.IngestionRun do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_ingestion_runs" do
field :tenant_id, :string
field :document_version_id, :binary_id
field :kind, :string, default: "ingest"
field :status, Ecto.Enum,
values: [:queued, :running, :completed, :failed, :cancelled],
default: :queued
field :attempt, :integer, default: 0
field :started_at, :utc_datetime_usec
field :finished_at, :utc_datetime_usec
field :error, :string
field :progress, :map, default: %{}
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(run, attrs) do
run
|> cast(attrs, [:tenant_id, :document_version_id, :kind, :status, :attempt,
:started_at, :finished_at, :error, :progress, :metadata])
|> validate_required([:tenant_id, :document_version_id])
end
endSourceMapping 可以做独立表,也可以放在 DocumentVersion 上:
schema "knowledge_source_mappings" do
field :tenant_id, :string
field :document_id, :binary_id
field :version_id, :binary_id
field :collection, :string
field :arcana_document_id, :binary_id
field :source_id, :string
field :source_uri, :string
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
endOban Worker Skeleton
defmodule MyApp.Workers.KnowledgeIngestionWorker do
use Oban.Worker, queue: :knowledge, max_attempts: 5
alias CMDCRAGArcana.Ingestion
alias CMDCRAGArcana.Ingestion.JobSpec
@impl true
def perform(%Oban.Job{args: args}) do
version = MyApp.Knowledge.get_version!(args["version_id"])
spec =
JobSpec.new!(
tenant_id: version.tenant_id,
collection: version.collection,
document_id: version.document_id,
version_id: version.id,
checksum: version.checksum,
source_uri: version.source_uri,
file_path: args["file_path"],
graph?: version.graph_enabled?,
preprocessor: MyApp.Knowledge.Parser
)
Ingestion.run(spec,
repo: MyApp.Repo,
progress: fn event, payload ->
MyApp.Knowledge.record_ingestion_progress(version.id, event, payload)
end
)
end
end同一个 document_id + checksum 应幂等。重建索引时先生成新 version,待
status == :ready 后再切换 Document.active_version_id。旧 active version
在切换前继续服务线上查询。
Parser / OCR Artifact Contract
Arcana 内置 parser 主要覆盖 txt/md/pdf 文本抽取。企业合同、扫描件、设备手册、
制度附件如果需要 OCR、版面解析、表格抽取,推荐由企业解析服务、离线导入流程
或后续 Elixir 解析能力输出 CMDCRAGArcana.Ingestion.ParsedDocument。Python
不进入 RAG runtime,只保留给后续蒸馏、训练和离线模型实验。
%CMDCRAGArcana.Ingestion.ParsedDocument{
text: "制度正文...",
content_type: "application/pdf",
checksum: "sha256:...",
source_uri: "kb://policies/approval.pdf",
pages: [
%CMDCRAGArcana.Ingestion.ParsedPage{
page_number: 3,
text: "高风险操作需要审批",
section: "审批制度",
bbox: %{x: 10, y: 20, width: 300, height: 80},
char_start: 1200,
char_end: 1234
}
],
tables: [
%CMDCRAGArcana.Ingestion.ParsedTable{
id: "tbl-approval",
page_number: 3,
title: "审批矩阵",
markdown: "| 风险 | 审批 |\n| L3 | 经理审批 |"
}
],
metadata: %{parser: "python-layout-v1"}
}Ingestion.run/2 会把 artifact 归一化为:
- Arcana ingest text
content_typechecksumsource_uripagestablessource_mapparser_metadata
这些字段进入 Arcana document metadata,供企业 Knowledge UI 和 citation 渲染使用。
Citation Span
CMDCRAGArcana.Citation 的 span 字段用于页码、段落、表格和 bbox 级引用:
{
"document_id": "doc-1",
"source_uri": "kb://policies/approval.pdf",
"span": {
"page_number": 3,
"section": "审批制度",
"paragraph": "p-7",
"table_id": "tbl-approval",
"bbox": {"x": 10, "y": 20, "width": 300, "height": 80},
"char_start": 1200,
"char_end": 1234
}
}CitationAudit 默认仍会移除 citation text,但保留 span provenance,方便审计
“引用了哪里”而不是泄露整段内容。
Status Backend
rag_ingest_status 通过 CMDCRAGArcana.Knowledge.StatusBackend 查询企业状态:
defmodule MyApp.Knowledge.RAGStatusBackend do
@behaviour CMDCRAGArcana.Knowledge.StatusBackend
@impl true
def get_status(opts) do
{:ok,
%CMDCRAGArcana.Knowledge.IndexStatus{
tenant_id: opts[:tenant_id],
collection: opts[:collection],
document_id: opts[:document_id],
version_id: opts[:version_id],
active_version_id: "...",
status: :ready,
graph_status: :stale,
chunk_count: 120,
stale?: false
}}
end
endAgent 配置:
user_data: %{
tenant_id: "tenant-a",
cmdc_rag_arcana: %{
allowed_collections: ["policies"],
status_backend: MyApp.Knowledge.RAGStatusBackend
}
}Maintenance
企业平台应从后台任务调用 maintenance wrapper:
CMDCRAGArcana.Maintenance.reembed(MyApp.Repo,
tenant_id: "tenant-a",
collection: "policies",
progress: fn current, total ->
MyApp.Knowledge.update_reembed_progress("policies", current, total)
end
)该 wrapper 会发:
[:cmdc_rag_arcana, :maintenance, :reembed, :start | :stop | :exception][:cmdc_rag_arcana, :maintenance, :reembed, :progress]- 可选 CMDC EventBus
{:plugin_event, :rag_reembed_progress, payload}
progress payload 统一使用 CMDCRAGArcana.ProgressEvent 字段:
%{
kind: :reembed,
event: :progress,
status: :running,
tenant_id: "tenant-a",
collection: "policies",
current: 10,
total: 100,
percent: 10.0,
occurred_at_ms: 1_779_999_000_000
}同一结构也用于 :rag_ingestion_progress 和后续 :rag_graph_progress。