cmdc_rag_arcana 不接管企业知识库数据库。生产平台应在 Phoenix/Ecto 中维护
租户、权限、文档版本、导入任务和索引状态,再把 ready version 映射到 Arcana。
边界
| 层 | 责任 |
|---|---|
| 企业 Phoenix app | Knowledge UI、租户、权限、审批、版本、状态、审计 |
cmdc_rag_arcana | Tool / Plugin / status helper / ingestion adapter contract |
| Arcana | ingest/search/ask/pipeline/graph/maintenance 底层能力 |
| Python sidecar | OCR、版面解析、表格抽取、特殊 parser fallback |
Arcana dashboard 已经提供 Documents、Collections、Search、Ask、Evaluation、 Maintenance、Graph 页面。生产企业平台不应直接把它当业务 Knowledge UI, 推荐只作为 admin/dev 调试入口,并始终加认证和管理员权限。
Ecto Schema 草案
defmodule MyApp.Knowledge.Collection do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_collections" do
field :tenant_id, :string
field :name, :string
field :description, :string
field :sensitivity_level, :string, default: "internal"
field :retention, :map, default: %{}
field :acl, :map, default: %{}
field :graph_enabled, :boolean, default: false
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(collection, attrs) do
collection
|> cast(attrs, [:tenant_id, :name, :description, :sensitivity_level,
:retention, :acl, :graph_enabled, :metadata])
|> validate_required([:tenant_id, :name])
|> unique_constraint([:tenant_id, :name])
end
end
defmodule MyApp.Knowledge.Document do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_documents" do
field :tenant_id, :string
field :collection, :string
field :title, :string
field :source_uri, :string
field :owner_id, :string
field :department, :string
field :active_version_id, :binary_id
field :sensitivity_level, :string, default: "internal"
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(document, attrs) do
document
|> cast(attrs, [:tenant_id, :collection, :title, :source_uri, :owner_id,
:department, :active_version_id, :sensitivity_level, :metadata])
|> validate_required([:tenant_id, :collection, :title])
end
end
defmodule MyApp.Knowledge.DocumentVersion do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_document_versions" do
field :tenant_id, :string
field :document_id, :binary_id
field :version, :string
field :checksum, :string
field :source_uri, :string
field :arcana_document_id, :binary_id
field :status, Ecto.Enum,
values: [:draft, :queued, :indexing, :ready, :failed, :archived],
default: :draft
field :stale_at, :utc_datetime_usec
field :parser, :string
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(version, attrs) do
version
|> cast(attrs, [:tenant_id, :document_id, :version, :checksum, :source_uri,
:arcana_document_id, :status, :stale_at, :parser, :metadata])
|> validate_required([:tenant_id, :document_id, :version, :checksum])
|> unique_constraint([:tenant_id, :document_id, :checksum])
end
end
defmodule MyApp.Knowledge.IngestionRun do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
schema "knowledge_ingestion_runs" do
field :tenant_id, :string
field :document_version_id, :binary_id
field :kind, :string, default: "ingest"
field :status, Ecto.Enum,
values: [:queued, :running, :completed, :failed, :cancelled],
default: :queued
field :attempt, :integer, default: 0
field :started_at, :utc_datetime_usec
field :finished_at, :utc_datetime_usec
field :error, :string
field :progress, :map, default: %{}
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
def changeset(run, attrs) do
run
|> cast(attrs, [:tenant_id, :document_version_id, :kind, :status, :attempt,
:started_at, :finished_at, :error, :progress, :metadata])
|> validate_required([:tenant_id, :document_version_id])
end
endSourceMapping 可以做独立表,也可以放在 DocumentVersion 上:
schema "knowledge_source_mappings" do
field :tenant_id, :string
field :document_id, :binary_id
field :version_id, :binary_id
field :collection, :string
field :arcana_document_id, :binary_id
field :source_id, :string
field :source_uri, :string
field :metadata, :map, default: %{}
timestamps(type: :utc_datetime_usec)
endOban Worker Skeleton
defmodule MyApp.Workers.KnowledgeIngestionWorker do
use Oban.Worker, queue: :knowledge, max_attempts: 5
alias CMDCRAGArcana.Ingestion
alias CMDCRAGArcana.Ingestion.JobSpec
@impl true
def perform(%Oban.Job{args: args}) do
version = MyApp.Knowledge.get_version!(args["version_id"])
spec =
JobSpec.new!(
tenant_id: version.tenant_id,
collection: version.collection,
document_id: version.document_id,
version_id: version.id,
checksum: version.checksum,
source_uri: version.source_uri,
file_path: args["file_path"],
graph?: version.graph_enabled?,
preprocessor: MyApp.Knowledge.ParserSidecar
)
Ingestion.run(spec,
repo: MyApp.Repo,
progress: fn event, payload ->
MyApp.Knowledge.record_ingestion_progress(version.id, event, payload)
end
)
end
end同一个 document_id + checksum 应幂等。重建索引时先生成新 version,待
status == :ready 后再切换 Document.active_version_id。旧 active version
在切换前继续服务线上查询。
Status Backend
rag_ingest_status 通过 CMDCRAGArcana.Knowledge.StatusBackend 查询企业状态:
defmodule MyApp.Knowledge.RAGStatusBackend do
@behaviour CMDCRAGArcana.Knowledge.StatusBackend
@impl true
def get_status(opts) do
{:ok,
%CMDCRAGArcana.Knowledge.IndexStatus{
tenant_id: opts[:tenant_id],
collection: opts[:collection],
document_id: opts[:document_id],
version_id: opts[:version_id],
active_version_id: "...",
status: :ready,
graph_status: :stale,
chunk_count: 120,
stale?: false
}}
end
endAgent 配置:
user_data: %{
tenant_id: "tenant-a",
cmdc_rag_arcana: %{
allowed_collections: ["policies"],
status_backend: MyApp.Knowledge.RAGStatusBackend
}
}Maintenance
企业平台应从后台任务调用 maintenance wrapper:
CMDCRAGArcana.Maintenance.reembed(MyApp.Repo,
tenant_id: "tenant-a",
collection: "policies",
progress: fn current, total ->
MyApp.Knowledge.update_reembed_progress("policies", current, total)
end
)该 wrapper 会发:
[:cmdc_rag_arcana, :maintenance, :reembed, :start | :stop | :exception][:cmdc_rag_arcana, :maintenance, :reembed, :progress]- 可选 CMDC EventBus
{:plugin_event, :rag_reembed_progress, payload}