CMDCTest.Workflow.Assertions (cmdc_test v0.3.0)

Copy Markdown View Source

Workflow Runtime contract 断言 helper。

接收 CMDCOrchestrator.status/2 返回的 %{run, node_runs, events} 快照, 或 CMDCTest.Workflow.Fixtures.status_snapshot/1 生成的离线 fixture。

Summary

Functions

断言 Gateway/SSE 事件具备 Trace Viewer 必需字段。

断言 human_task 已进入等待或产生创建事件。

断言两个返回值命中同一 run id,用于 HTTP 重试幂等测试。

断言同一批节点没有重复执行。

断言节点 signal 路径。

断言 run 已成功完成。

断言存在指定类型事件。

Functions

assert_gateway_event_shape(event, opts \\ [])

@spec assert_gateway_event_shape(
  map(),
  keyword()
) :: map()

断言 Gateway/SSE 事件具备 Trace Viewer 必需字段。

assert_human_task_created(snapshot, opts \\ [])

@spec assert_human_task_created(
  map(),
  keyword()
) :: map()

断言 human_task 已进入等待或产生创建事件。

assert_idempotent_run(first, second)

@spec assert_idempotent_run(term(), term()) :: String.t()

断言两个返回值命中同一 run id,用于 HTTP 重试幂等测试。

assert_no_duplicate_node_attempts(snapshot, node_ids)

@spec assert_no_duplicate_node_attempts(map(), [String.t()]) :: map()

断言同一批节点没有重复执行。

assert_node_status(snapshot, node_id, expected_status)

@spec assert_node_status(map(), String.t(), atom() | String.t()) :: map()

断言指定节点状态。

assert_signal_path(snapshot, expected)

@spec assert_signal_path(map(), [{String.t(), String.t()}]) :: map()

断言节点 signal 路径。

assert_workflow_completed(snapshot)

@spec assert_workflow_completed(map()) :: map()

断言 run 已成功完成。

assert_workflow_event(snapshot, type, opts \\ [])

@spec assert_workflow_event(map(), String.t(), keyword()) :: map()

断言存在指定类型事件。