#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ DecodeProcess Agent 的工具集 工具均为纯 Python 函数(无 @tool 装饰器),LangChain `create_agent` 会从函数签名 + docstring 推断 schema。 所有写操作都通过 WorkflowContext 落到本地 JSON 文件;返回值统一为 {"success": bool, "message"/"error": str, ...} 让 LLM 能感知失败并修正。 ⚠️ 关于字段留空:「传空字符串」指的是**长度为 0 的字符串**(实际传 `tool_name=""`、 `value=""`),**不要**把两个引号字符当字面值写进字段(即 `tool_name='""'` —— 这是 长度为 2 的字符串,前端会把它当作真实内容渲染出来)。同理 `"-"` / `"N/A"` / `"无"` 这类占位字符串也不是"空"。**没东西可写 → 传真正的 ""**。 ⚠️ 关于 `actions` / `roles` / `purposes` 数组:每一项是**单一标签**,不要把多个候选用 `/` 或 `、` 拼到同一项里。错例:`actions=["拼接/识别/提取"]`;正例:`actions=["拼接", "识别", "提取"]`。系统 prompt 里推荐取值用 `/` 列出只是文档分隔形式,不是字段值语法。 """ from typing import Any, Dict, List, Optional from workflow_store import WorkflowContext, WorkflowStoreError def _ok(message: str, **extra: Any) -> Dict[str, Any]: out: Dict[str, Any] = {"success": True, "message": message} out.update(extra) return out def _err(error: str) -> Dict[str, Any]: return {"success": False, "error": error} def think_and_plan(thought: str, plan: str, action: str) -> str: """系统化思考与规划工具:在做工序拆解决策前,先梳理思路、整体计划与下一步行动。 本工具不修改任何状态,只负责打印记录,便于事后追溯 Agent 的推理过程。 在开始 add_step 之前应至少调用一次。 Args: thought: 当前思考内容,例如"这是一张化妆教程,整体可分为底妆/眼妆/唇妆三阶段"。 plan: 整体方案,**必须包含初步的 step name 列表**作为工序主体脉络(脉络锚)。 初步 name 是工作假设,只需粗略反映"动作+对象";后续 I/O 写齐后会回头集中浓缩。 例: 1. 生成基础人像底图 2. 局部重绘眼妆细节 3. 应用整体风格统一画面 action: 紧接着要执行的具体动作,例如"调用 add_step 添加 step_id=1 的生成步骤"。 Returns: 将上述三段拼接后的纯文本回显。 """ response = ( f"[think_and_plan]\n" f"thought: {thought}\n" f"plan: {plan}\n" f"action: {action}\n" ) print(response) return response def add_step( step_id: str, name: str, actions: List[str], roles: List[str], purposes: List[str], source_ref: str, tool_name: str, tool_method: str, ) -> Dict[str, Any]: """追加一个工序步骤的骨架(不含 inputs / outputs)。 必须在调用 add_step_input / add_step_output 之前先把骨架建好。 Args: step_id: 步骤 id,建议自然数字符串,如 "1"、"2"。在同一工序内必须唯一。 name: 步骤名称(6~20 字可读自然语言短语,"动作 + 对象(模态/类型) + 通用作用/目的"的浓缩)。 示例:「生成基础人像底图」、「局部重绘眼妆细节」、「应用整体风格统一画面」、 「生成人像底图为后续编辑提供锚定」。 **两阶段产物**:在 add_step 时可用 think_and_plan 里列出的**初步 name** 即可; 所有 step + I/O 写齐后,工作流要求在 finalize 前**集中**用 update_step 把 name 浓缩为 "actions + 主要 I/O 的 modality+category + roles + purposes 的最佳可读投影"。 禁止:只写孤立动词("生成")、写工具名("Midjourney")、写内容细节(角色身份/风格词/色彩/参数等)。 actions: **数组**。这一步执行的**纯动作**标签(剥离对象/目的后的动词)。 例:`["生成"]` / `["局部重绘", "风格化"]` / `["分析", "提取"]`。 1 项最常见;只在同一次工具调用确实合并了多个动作时填多项。 无法判断时传 `[]`。 roles: **数组**。这一步在整体工序里担任的**抽象节点类型**(pipeline 层级标签)。 例:`["底图构建"]` / `["细节加工", "风格调优"]` / `["素材采集"]`。 与 actions 区别:actions 是纯动词;roles 是 pipeline 节点类别。 **禁止**让 roles 只是 actions 的名词化(如 actions=["生成"]、roles=["生成"])。 无法判断时传 `[]`。 purposes: **数组**。这一步在追求的**质量维度 / 目标属性标签**。 写成短标签(2~6 字名词性短语),**不是长句**,**不是关系描述**。 例:`["角色一致性"]` / `["细节还原度", "风格一致性"]` / `["真实感"]` / `["可复用性"]` / `["多样性"]`。 常见维度:一致性 / 还原度 / 多样性 / 可控性 / 美观度 / 真实感 / 可复用性 / 完整性 / 效率。 **硬约束**: 1) 形态必须是「质量属性标签」,不是动词、不是流程类别、不是长句。 2) 必须能从输入推断;推不出 → `[]`。留空时 name 也要同步省略目的片段。 **`name` / `actions` / `roles` / `purposes` 四者同源**: `name` 是 `actions` + 主要 I/O 的 modality+category + `roles` + `purposes` 浓缩出的可读自然语言; 反过来,`actions` / `roles` / `purposes` 是 `name` 的结构化反向分解。 **结构化字段是完备真源;`name` 是它们的有损可读子集**。 写完一个 step 必须自检: - 向前对齐:name 的每个语义片段都能定位到某个结构化字段; - 向后对齐:actions/roles/purposes 的每一项都能在 name 中被同义/近义地体现。 source_ref: 该步骤的来源依据,可以引用图序号或正文片段,如 "图2 / 正文第3句"。 无据则不应新增该步骤;难以精确表达可留空字符串。 tool_name: 工具/产品名称(不含方法/参数)。例:「Midjourney」/「Nano Banana」/「manus」。 完全判断不出来时留空字符串或填 "文生图类工具" 等能力兜底。 tool_method: 在该工具里**实际可观察到**的具体方法/功能/参数。 只允许:图中可见的 UI 标签/按钮/参数/模型挡位,或正文里明确点名的功能名。 **禁止**脑补描述工具用途(如写 "视频分析与提示词提取"); 没观察到就传空字符串 `""`。 Returns: {"success": True, "step": {...}, "current_step_count": N};失败时返回 error。 """ try: step = WorkflowContext.add_step( step_id=step_id, name=name, actions=actions, roles=roles, purposes=purposes, source_ref=source_ref, tool_name=tool_name, tool_method=tool_method, ) except WorkflowStoreError as e: return _err(str(e)) snapshot = WorkflowContext.get() print(f"[add_step] step_id={step_id} name={name} ok(共{len(snapshot['steps'])}步)") return _ok( f"step_id={step_id} 已添加", step=step, current_step_count=len(snapshot["steps"]), ) def add_step_input( step_id: str, modality: str, category: str, source_id: str, value: str, ) -> Dict[str, Any]: """为指定步骤追加一项输入。 Args: step_id: 目标步骤 id,必须已通过 add_step 创建。 modality: 数据模态(受控)。从 "文本" / "图片" / "视频" / "音频" / "文件" 中选取。 category: 该输入在制作领域里的**专有类型**(领域内通用短语)。 例:`prompt` / `负向提示词` / `脚本` / `分镜脚本` / `人物参考图` / `场景参考图` / `底图` / `分镜图` / `运镜描述` / `知识库` / `配音` 等。 无法用领域专有名词归类时传 `""`。 source_id: 输入来源 id。规则: - 若为该工序的初始输入,使用 "init_input_",N 从 1 开始; - 若来自前一步的输出,必须填那一步的某个 outputs[*].id(如 "p1_output")。 value: 实例化后的具体内容。文本类填原文片段或凝练;图片类填可见物体的简洁描述。 Returns: {"success": True, "input": {...}};source_id 不可解析或 step 不存在时返回 error。 """ try: item = WorkflowContext.add_step_input( step_id=step_id, modality=modality, category=category, source_id=source_id, value=value, ) except WorkflowStoreError as e: return _err(str(e)) print(f"[add_step_input] step_id={step_id} source_id={source_id} ok") return _ok(f"step_id={step_id} 已追加输入", input=item) def add_step_output( step_id: str, output_id: str, modality: str, category: str, value: str, ) -> Dict[str, Any]: """为指定步骤追加一项输出。 Args: step_id: 目标步骤 id,必须已通过 add_step 创建。 output_id: 输出 id,命名约定: - 单一输出:使用 "p_output",例如 step_id=1 -> "p1_output"; - 多输出:使用 "p_output_",M 从 1 开始,例如 "p2_output_1"。 - 全局唯一,不能与任何已有输出 id 冲突。 modality: 数据模态(受控)。从 "文本" / "图片" / "视频" / "音频" / "文件" 中选取。 category: 输出在制作领域里的**专有类型**。 例:`人物参考图` / `场景参考图` / `分镜图` / `成品图` / `视频片段` / `妆容描述` / `脚本` / `prompt` / `提示词库`。 无法用领域专有名词归类时传 `""`。 value: 实例化后的具体内容。 Returns: {"success": True, "output": {...}};id 冲突或 step 不存在时返回 error。 """ try: item = WorkflowContext.add_step_output( step_id=step_id, output_id=output_id, modality=modality, category=category, value=value, ) except WorkflowStoreError as e: return _err(str(e)) print(f"[add_step_output] step_id={step_id} output_id={output_id} ok") return _ok(f"step_id={step_id} 已追加输出 {output_id}", output=item) def get_current_workflow() -> Dict[str, Any]: """读取当前已写入的工序(含所有已添加的 step / input / output)。 用于 Agent 自检:例如在追加引用某 output_id 的输入前,先确认其确实存在。 Returns: 当前工序的完整快照(深拷贝)。 """ try: wf = WorkflowContext.get() except WorkflowStoreError as e: return _err(str(e)) print(f"[get_current_workflow] steps={len(wf['steps'])} status={wf['status']}") return wf def update_step( step_id: str, name: Optional[str] = None, actions: Optional[List[str]] = None, roles: Optional[List[str]] = None, purposes: Optional[List[str]] = None, source_ref: Optional[str] = None, tool_name: Optional[str] = None, tool_method: Optional[str] = None, ) -> Dict[str, Any]: """部分更新某个步骤的元信息字段。step_id 本身不可改。 任何字段传 None(或省略)表示**保持原值**。 标量字段(name/source_ref/tool_name/tool_method):传 `""` 表示清空。 数组字段(actions/roles/purposes):传 `[]` 表示清空;传 `["a", "b"]` **整体替换**为该数组 (不是追加,是替换)。 典型用法: - I/O 阶段补全结构化字段:`update_step(step_id="2", actions=["局部重绘", "风格化"], purposes=["风格一致性"])` - finalize 前**集中浓缩 name**:所有 step + I/O 都写齐后,根据完整的 actions/roles/purposes + 主要 I/O 的 modality+category,调 `update_step(step_id="2", name="为底图局部重绘眼妆细节")` 把 name 重写成最佳浓缩可读自然语言(参见系统提示中"必须遵守的工作流" step 8)。 Args: step_id: 目标步骤 id,必须已存在。 name: 新的步骤名称;不改则 None。 actions: 新的动作数组(整体替换);不改则 None。 roles: 新的流程角色数组(整体替换);不改则 None。 purposes: 新的目的数组(整体替换);不改则 None。 source_ref: 新的来源依据;不改则 None;传 "" 清空。 tool_name: 新的工具名称;不改则 None;传 "" 清空。 tool_method: 新的工具方法;不改则 None;传 "" 清空。 Returns: {"success": True, "step": {更新后的完整 step}};失败时返回 error。 """ try: step = WorkflowContext.update_step( step_id=step_id, name=name, actions=actions, roles=roles, purposes=purposes, source_ref=source_ref, tool_name=tool_name, tool_method=tool_method, ) except WorkflowStoreError as e: return _err(str(e)) print(f"[update_step] step_id={step_id} ok") return _ok(f"step_id={step_id} 已更新", step=step) def update_step_input( step_id: str, input_index: int, modality: Optional[str] = None, category: Optional[str] = None, source_id: Optional[str] = None, value: Optional[str] = None, ) -> Dict[str, Any]: """部分更新某个步骤的某一项输入。 输入用 (step_id, input_index) 定位,input_index 是该步骤 inputs 数组的 0-based 下标。 若不确定下标,请先调用 get_current_workflow 查看当前状态。 若 source_id 也被修改了,新值会重新做依赖校验:必须是 init_input_,或本步骤之前 已经存在的某个 output_id。 Args: step_id: 目标步骤 id。 input_index: 输入项在 inputs 数组中的下标(从 0 开始)。 modality: 新的数据模态(文本/图片/视频/音频/文件);不改则 None。 category: 新的领域类型(如 `prompt` / `人物参考图`);不改则 None;传 "" 清空。 source_id: 新的输入来源 id;不改则 None。 value: 新的实例化内容;不改则 None。 Returns: {"success": True, "input": {更新后的输入项}};失败时返回 error。 """ try: item = WorkflowContext.update_step_input( step_id=step_id, input_index=input_index, modality=modality, category=category, source_id=source_id, value=value, ) except WorkflowStoreError as e: return _err(str(e)) print(f"[update_step_input] step_id={step_id} index={input_index} ok") return _ok(f"step_id={step_id} 第 {input_index} 项输入已更新", input=item) def update_step_output( step_id: str, output_id: str, modality: Optional[str] = None, category: Optional[str] = None, value: Optional[str] = None, ) -> Dict[str, Any]: """部分更新某个步骤的某一项输出。output_id 本身不可改(其它输入可能引用它)。 如果一定要换 output_id,请用 add_step_output 创建一个新的,再 update_step_input 把所有引用切到新 id,最后 delete_step_output 删旧的。 Args: step_id: 目标步骤 id。 output_id: 目标输出 id(用作定位 key,不会被修改)。 modality: 新的数据模态;不改则 None。 category: 新的领域类型;不改则 None;传 "" 清空。 value: 新的实例化内容;不改则 None。 Returns: {"success": True, "output": {更新后的输出项}};失败时返回 error。 """ try: item = WorkflowContext.update_step_output( step_id=step_id, output_id=output_id, modality=modality, category=category, value=value, ) except WorkflowStoreError as e: return _err(str(e)) print(f"[update_step_output] step_id={step_id} output_id={output_id} ok") return _ok(f"step_id={step_id} 输出 {output_id} 已更新", output=item) def delete_step(step_id: str) -> Dict[str, Any]: """删除整个步骤(包含其所有输入和输出)。 若该步骤的任何输出仍被后续步骤的输入引用,会拒绝并提示先解除依赖。 Args: step_id: 要删除的步骤 id。 Returns: {"success": True, "removed_step": {被删的完整 step 快照}};失败时返回 error。 """ try: removed = WorkflowContext.delete_step(step_id=step_id) except WorkflowStoreError as e: return _err(str(e)) print(f"[delete_step] step_id={step_id} ok") return _ok(f"step_id={step_id} 已删除", removed_step=removed) def delete_step_input(step_id: str, input_index: int) -> Dict[str, Any]: """删除某个步骤的某一项输入。输入没有反向引用,删除不会牵连其他位置。 Args: step_id: 目标步骤 id。 input_index: 输入项在 inputs 数组中的下标(从 0 开始)。注意:删除后后面项的下标会前移。 Returns: {"success": True, "removed_input": {被删的输入项}};失败时返回 error。 """ try: removed = WorkflowContext.delete_step_input( step_id=step_id, input_index=input_index ) except WorkflowStoreError as e: return _err(str(e)) print(f"[delete_step_input] step_id={step_id} index={input_index} ok") return _ok(f"step_id={step_id} 第 {input_index} 项输入已删除", removed_input=removed) def delete_step_output(step_id: str, output_id: str) -> Dict[str, Any]: """删除某个步骤的某一项输出。 若该输出仍被任何输入引用,会拒绝并提示先解除依赖(update_step_input 或 delete_step_input 把那些输入处理掉)。 Args: step_id: 目标步骤 id。 output_id: 要删除的输出 id。 Returns: {"success": True, "removed_output": {被删的输出项}};失败时返回 error。 """ try: removed = WorkflowContext.delete_step_output( step_id=step_id, output_id=output_id ) except WorkflowStoreError as e: return _err(str(e)) print(f"[delete_step_output] step_id={step_id} output_id={output_id} ok") return _ok( f"step_id={step_id} 输出 {output_id} 已删除", removed_output=removed ) def finalize_workflow(summary: str) -> Dict[str, Any]: """收尾:写入整体摘要并把 status 置为 "finalized",落盘最终 JSON。 所有步骤都已添加完毕后调用一次。调用后不能再继续追加步骤。 Args: summary: 一段不超过 200 字的摘要,概括整套工序的目标与产物。 Returns: {"success": True, "workflow": {...}};当前没有任何步骤时返回 error。 """ try: wf = WorkflowContext.finalize(summary=summary) except WorkflowStoreError as e: return _err(str(e)) print(f"[finalize_workflow] finalized(共{len(wf['steps'])}步)-> {WorkflowContext.output_path()}") return _ok("工序已 finalize", workflow=wf)