| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892 |
- """
- 飞书实时对话 Agent
- 通过飞书 WebSocket 监听消息,调用 Qwen LLM 生成回复,实现实时对话。
- 支持工具调用:浏览目录、读取文件、执行 bash 命令。
- 用法:
- python -m agent.tools.builtin.feishu.feishu_agent
- 环境变量:
- FEISHU_APP_ID / FEISHU_APP_SECRET: 飞书应用凭证
- QWEN_API_KEY: 通义千问 API Key
- QWEN_BASE_URL: 通义千问 API 地址(可选,默认阿里云)
- FEISHU_AGENT_MODEL: 模型名称(默认 qwen-plus)
- FEISHU_AGENT_SYSTEM_PROMPT: 自定义 system prompt(可选)
- """
- import os
- import sys
- import json
- import asyncio
- import logging
- import subprocess
- import threading
- import base64
- import zipfile
- from typing import Dict, List, Any, Optional
- from collections import defaultdict
- PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
- if PROJECT_ROOT not in sys.path:
- sys.path.insert(0, PROJECT_ROOT)
- from agent.tools.builtin.feishu.feishu_client import FeishuClient, FeishuMessageEvent, FeishuDomain, ReceiveIdType
- from agent.tools.builtin.feishu.chat import (
- FEISHU_APP_ID,
- FEISHU_APP_SECRET,
- get_contact_by_id,
- load_chat_history,
- save_chat_history,
- )
- from agent.llm.qwen import qwen_llm_call
- logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(name)s] %(levelname)s %(message)s')
- logger = logging.getLogger("FeishuAgent")
- # ===== 配置 =====
- MODEL = os.getenv("FEISHU_AGENT_MODEL", "qwen3.5-397b-a17b")
- MAX_HISTORY = 50
- MAX_TOOL_ROUNDS = 10 # 工具调用最大循环次数
- ALLOWED_CONTACTS = {"关涛"}
- DEFAULT_SYSTEM_PROMPT = """你是一个友好、有帮助的 AI 助手,正在通过飞书和用户对话。
- 你可以使用工具来浏览本地目录、读取文件内容、执行 bash 命令。
- 请用简洁清晰的中文回复。如果用户使用其他语言,请用对应语言回复。"""
- SYSTEM_PROMPT = os.getenv("FEISHU_AGENT_SYSTEM_PROMPT", DEFAULT_SYSTEM_PROMPT)
- # ===== 工具定义 =====
- TOOLS = [
- {
- "type": "function",
- "function": {
- "name": "list_directory",
- "description": "列出指定目录下的文件和子目录",
- "parameters": {
- "type": "object",
- "properties": {
- "path": {"type": "string", "description": "目录路径,默认当前目录"}
- },
- "required": []
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "read_file",
- "description": "读取指定文件的内容",
- "parameters": {
- "type": "object",
- "properties": {
- "path": {"type": "string", "description": "文件路径"},
- "max_lines": {"type": "integer", "description": "最多读取行数,默认200"}
- },
- "required": ["path"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "run_bash",
- "description": "执行 bash/shell 命令并返回输出",
- "parameters": {
- "type": "object",
- "properties": {
- "command": {"type": "string", "description": "要执行的命令"}
- },
- "required": ["command"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "send_image",
- "description": "发送图片到飞书",
- "parameters": {
- "type": "object",
- "properties": {
- "path": {"type": "string", "description": "本地图片文件路径"}
- },
- "required": ["path"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "send_file",
- "description": "发送文件到飞书",
- "parameters": {
- "type": "object",
- "properties": {
- "path": {"type": "string", "description": "本地文件路径"}
- },
- "required": ["path"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "zip_and_send",
- "description": "将本地目录打包成 zip 并发送到飞书",
- "parameters": {
- "type": "object",
- "properties": {
- "path": {"type": "string", "description": "要打包的目录路径"}
- },
- "required": ["path"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "run_background",
- "description": "在后台启动一个长期运行的命令(如服务器、agent),返回进程 ID。不会等待命令结束。",
- "parameters": {
- "type": "object",
- "properties": {
- "command": {"type": "string", "description": "要执行的命令"},
- "name": {"type": "string", "description": "给这个后台进程起个名字,方便后续管理"}
- },
- "required": ["command"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "stop_process",
- "description": "停止一个后台运行的进程",
- "parameters": {
- "type": "object",
- "properties": {
- "pid": {"type": "integer", "description": "进程 ID"}
- },
- "required": ["pid"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "list_processes",
- "description": "列出所有通过 run_background 启动的后台进程及其状态",
- "parameters": {
- "type": "object",
- "properties": {},
- "required": []
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "get_process_output",
- "description": "获取后台进程的最新输出日志",
- "parameters": {
- "type": "object",
- "properties": {
- "pid": {"type": "integer", "description": "进程 ID"},
- "tail": {"type": "integer", "description": "获取最后 N 行,默认50"}
- },
- "required": ["pid"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "send_input",
- "description": "向后台进程发送输入(如交互式命令)",
- "parameters": {
- "type": "object",
- "properties": {
- "pid": {"type": "integer", "description": "进程 ID"},
- "text": {"type": "string", "description": "要发送的文本"}
- },
- "required": ["pid", "text"]
- }
- }
- }
- ]
- # ===== 工具执行 =====
- # 全局变量:保存当前会话的 chat_id,供工具使用
- _current_chat_id = None
- # 后台进程管理
- _background_processes: Dict[int, Dict[str, Any]] = {} # pid → {proc, name, command, output_lines, started_at}
- def execute_tool(name: str, arguments: Dict[str, Any]) -> str:
- """执行工具调用,返回结果字符串"""
- try:
- if name == "list_directory":
- return _tool_list_directory(arguments.get("path", "."))
- elif name == "read_file":
- return _tool_read_file(arguments["path"], arguments.get("max_lines", 200))
- elif name == "run_bash":
- return _tool_run_bash(arguments["command"])
- elif name == "send_image":
- return _tool_send_image(arguments["path"])
- elif name == "send_file":
- return _tool_send_file(arguments["path"])
- elif name == "zip_and_send":
- return _tool_zip_and_send(arguments["path"])
- elif name == "run_background":
- return _tool_run_background(arguments["command"], arguments.get("name", ""))
- elif name == "stop_process":
- return _tool_stop_process(arguments["pid"])
- elif name == "list_processes":
- return _tool_list_processes()
- elif name == "get_process_output":
- return _tool_get_process_output(arguments["pid"], arguments.get("tail", 50))
- elif name == "send_input":
- return _tool_send_input(arguments["pid"], arguments["text"])
- else:
- return f"未知工具: {name}"
- except Exception as e:
- return f"工具执行出错: {type(e).__name__}: {e}"
- def _tool_list_directory(path: str) -> str:
- path = os.path.abspath(path)
- if not os.path.isdir(path):
- return f"目录不存在: {path}"
- entries = []
- for name in sorted(os.listdir(path)):
- full = os.path.join(path, name)
- suffix = "/" if os.path.isdir(full) else ""
- entries.append(f" {name}{suffix}")
- header = f"目录: {path} ({len(entries)} 项)\n"
- return header + "\n".join(entries) if entries else header + " (空目录)"
- def _tool_read_file(path: str, max_lines: int = 200) -> str:
- path = os.path.abspath(path)
- if not os.path.isfile(path):
- return f"文件不存在: {path}"
- try:
- with open(path, "r", encoding="utf-8", errors="replace") as f:
- lines = []
- for i, line in enumerate(f):
- if i >= max_lines:
- lines.append(f"\n... (截断,共读取 {max_lines} 行)")
- break
- lines.append(line.rstrip())
- return "\n".join(lines)
- except Exception as e:
- return f"读取失败: {e}"
- def _tool_run_bash(command: str) -> str:
- # 将项目 .venv 的 Scripts/bin 目录加到 PATH 最前面,确保 python 指向虚拟环境
- env = os.environ.copy()
- venv_scripts = os.path.join(PROJECT_ROOT, ".venv", "Scripts") # Windows
- if not os.path.isdir(venv_scripts):
- venv_scripts = os.path.join(PROJECT_ROOT, ".venv", "bin") # Linux/Mac
- env["PATH"] = venv_scripts + os.pathsep + env.get("PATH", "")
- env["VIRTUAL_ENV"] = os.path.join(PROJECT_ROOT, ".venv")
- env["PYTHONIOENCODING"] = "utf-8"
- try:
- proc = subprocess.Popen(
- command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- text=True, cwd=PROJECT_ROOT, env=env, encoding="utf-8", errors="replace",
- )
- try:
- stdout, stderr = proc.communicate(timeout=30)
- except subprocess.TimeoutExpired:
- # 超时:杀掉整个进程树(Windows 需要 taskkill)
- try:
- if sys.platform == "win32":
- subprocess.run(
- f"taskkill /F /T /PID {proc.pid}",
- shell=True, capture_output=True, timeout=5,
- )
- else:
- import signal
- os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
- except Exception:
- proc.kill()
- proc.wait(timeout=5)
- return "命令执行超时(30秒),已终止进程"
- output = ""
- if stdout:
- output += stdout
- if stderr:
- output += ("\n--- stderr ---\n" + stderr) if output else stderr
- if proc.returncode != 0:
- output += f"\n(exit code: {proc.returncode})"
- return output.strip() or "(无输出)"
- except Exception as e:
- return f"执行失败: {e}"
- def _tool_send_image(path: str) -> str:
- """发送图片到飞书"""
- from agent.tools.builtin.feishu.chat import FEISHU_APP_ID, FEISHU_APP_SECRET
- from agent.tools.builtin.feishu.feishu_client import FeishuClient, FeishuDomain, ReceiveIdType
- if not _current_chat_id:
- return "错误:无法获取当前会话 ID"
- path = os.path.abspath(path)
- if not os.path.isfile(path):
- return f"文件不存在: {path}"
- try:
- client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET, domain=FeishuDomain.FEISHU)
- client.send_image(to=_current_chat_id, image=path)
- return f"图片已发送: {os.path.basename(path)}"
- except Exception as e:
- return f"发送图片失败: {e}"
- def _tool_send_file(path: str) -> str:
- """发送文件到飞书"""
- from agent.tools.builtin.feishu.chat import FEISHU_APP_ID, FEISHU_APP_SECRET
- from agent.tools.builtin.feishu.feishu_client import FeishuClient, FeishuDomain
- if not _current_chat_id:
- return "错误:无法获取当前会话 ID"
- path = os.path.abspath(path)
- if not os.path.isfile(path):
- return f"文件不存在: {path}"
- try:
- client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET, domain=FeishuDomain.FEISHU)
- client.send_file(to=_current_chat_id, file=path, file_name=os.path.basename(path))
- return f"文件已发送: {os.path.basename(path)}"
- except Exception as e:
- return f"发送文件失败: {e}"
- def _tool_zip_and_send(path: str) -> str:
- """打包目录并发送到飞书"""
- from agent.tools.builtin.feishu.chat import FEISHU_APP_ID, FEISHU_APP_SECRET
- from agent.tools.builtin.feishu.feishu_client import FeishuClient, FeishuDomain
- if not _current_chat_id:
- return "错误:无法获取当前会话 ID"
- path = os.path.abspath(path)
- if not os.path.isdir(path):
- return f"目录不存在: {path}"
- try:
- # 创建临时 zip 文件
- zip_name = os.path.basename(path.rstrip(os.sep)) + ".zip"
- zip_path = os.path.join(PROJECT_ROOT, zip_name)
- with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for root, dirs, files in os.walk(path):
- for file in files:
- file_path = os.path.join(root, file)
- arcname = os.path.relpath(file_path, path)
- zipf.write(file_path, arcname)
- client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET, domain=FeishuDomain.FEISHU)
- client.send_file(to=_current_chat_id, file=zip_path, file_name=zip_name)
- # 删除临时文件
- os.remove(zip_path)
- return f"目录已打包并发送: {zip_name}"
- except Exception as e:
- return f"打包发送失败: {e}"
- # ===== 后台进程管理 =====
- def _output_reader(proc: subprocess.Popen, pid: int):
- """后台线程:持续读取进程输出并存入缓冲区"""
- try:
- for line in iter(proc.stdout.readline, ''):
- if pid not in _background_processes:
- break
- _background_processes[pid]["output_lines"].append(line.rstrip())
- # 只保留最近 500 行
- if len(_background_processes[pid]["output_lines"]) > 500:
- _background_processes[pid]["output_lines"] = _background_processes[pid]["output_lines"][-500:]
- except Exception:
- pass
- # stderr 也读
- try:
- for line in iter(proc.stderr.readline, ''):
- if pid not in _background_processes:
- break
- _background_processes[pid]["output_lines"].append(f"[stderr] {line.rstrip()}")
- if len(_background_processes[pid]["output_lines"]) > 500:
- _background_processes[pid]["output_lines"] = _background_processes[pid]["output_lines"][-500:]
- except Exception:
- pass
- def _tool_run_background(command: str, name: str = "") -> str:
- """后台启动命令"""
- env = os.environ.copy()
- venv_scripts = os.path.join(PROJECT_ROOT, ".venv", "Scripts")
- if not os.path.isdir(venv_scripts):
- venv_scripts = os.path.join(PROJECT_ROOT, ".venv", "bin")
- env["PATH"] = venv_scripts + os.pathsep + env.get("PATH", "")
- env["VIRTUAL_ENV"] = os.path.join(PROJECT_ROOT, ".venv")
- env["PYTHONIOENCODING"] = "utf-8"
- env["PYTHONUNBUFFERED"] = "1" # 强制 Python 无缓冲输出
- env["PYTHONIOENCODING"] = "utf-8" # 强制 Python IO 编码为 utf-8
- try:
- proc = subprocess.Popen(
- command, shell=True,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- text=True, cwd=PROJECT_ROOT, env=env, encoding="utf-8", errors="replace",
- bufsize=1, # 行缓冲
- )
- from datetime import datetime
- _background_processes[proc.pid] = {
- "proc": proc,
- "name": name or command[:40],
- "command": command,
- "output_lines": [],
- "started_at": datetime.now().strftime("%H:%M:%S"),
- }
- # 启动输出读取线程
- t = threading.Thread(target=_output_reader, args=(proc, proc.pid), daemon=True)
- t.start()
- return f"后台进程已启动: PID={proc.pid}, name={name or command[:40]}"
- except Exception as e:
- return f"启动失败: {e}"
- def _tool_stop_process(pid: int) -> str:
- """停止后台进程"""
- info = _background_processes.get(pid)
- if not info:
- return f"未找到 PID={pid} 的后台进程"
- proc = info["proc"]
- name = info["name"]
- try:
- if sys.platform == "win32":
- subprocess.run(f"taskkill /F /T /PID {pid}", shell=True, capture_output=True, timeout=5)
- else:
- import signal
- os.killpg(os.getpgid(pid), signal.SIGKILL)
- except Exception:
- proc.kill()
- try:
- proc.wait(timeout=5)
- except Exception:
- pass
- _background_processes.pop(pid, None)
- return f"已停止进程: PID={pid} ({name})"
- def _tool_list_processes() -> str:
- """列出所有后台进程"""
- if not _background_processes:
- return "当前没有后台进程"
- lines = []
- for pid, info in _background_processes.items():
- proc = info["proc"]
- status = "运行中" if proc.poll() is None else f"已退出(code={proc.returncode})"
- lines.append(f" PID={pid} | {status} | {info['name']} | 启动于 {info['started_at']}")
- return f"后台进程 ({len(lines)} 个):\n" + "\n".join(lines)
- def _tool_get_process_output(pid: int, tail: int = 50) -> str:
- """获取后台进程的最新输出"""
- info = _background_processes.get(pid)
- if not info:
- return f"未找到 PID={pid} 的后台进程"
- output_lines = info["output_lines"]
- proc = info["proc"]
- status = "运行中" if proc.poll() is None else f"已退出(code={proc.returncode})"
- if not output_lines:
- return f"PID={pid} ({info['name']}) [{status}]: 暂无输出"
- recent = output_lines[-tail:]
- header = f"PID={pid} ({info['name']}) [{status}] 最近 {len(recent)} 行:\n"
- return header + "\n".join(recent)
- def _tool_send_input(pid: int, text: str) -> str:
- """向后台进程发送输入(通过控制文件)"""
- info = _background_processes.get(pid)
- if not info:
- return f"未找到 PID={pid} 的后台进程"
- proc = info["proc"]
- if proc.poll() is not None:
- return f"进程已退出,无法发送输入"
- try:
- # 使用控制文件方式(更可靠)
- control_file = os.path.join(PROJECT_ROOT, ".agent_control")
- with open(control_file, "w", encoding="utf-8") as f:
- f.write(text.strip())
- return f"已向 PID={pid} 发送控制指令: {text.strip()}"
- except Exception as e:
- return f"发送输入失败: {e}"
- return f"发送输入失败: {e}"
- class FeishuAgent:
- """飞书实时对话 Agent"""
- def __init__(self):
- self.client = FeishuClient(
- app_id=FEISHU_APP_ID,
- app_secret=FEISHU_APP_SECRET,
- domain=FeishuDomain.FEISHU,
- )
- self.conversations: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
- self._history_loaded: set = set()
- self.loop = asyncio.new_event_loop()
- self._loop_thread = threading.Thread(target=self._run_loop, daemon=True)
- self._loop_thread.start()
- def _run_loop(self):
- asyncio.set_event_loop(self.loop)
- self.loop.run_forever()
- def _get_conversation_key(self, event: FeishuMessageEvent) -> str:
- if event.chat_type and event.chat_type.value == "p2p":
- return event.sender_open_id
- return event.chat_id
- def _build_messages(self, conv_key: str) -> List[Dict[str, Any]]:
- messages = [{"role": "system", "content": SYSTEM_PROMPT}]
- conv = self.conversations[conv_key]
- # 验证并清理 tool_calls:确保每个 assistant+tool_calls 后面都有对应的 tool response
- clean = []
- i = 0
- while i < len(conv):
- msg = conv[i]
- if msg.get("role") == "assistant" and msg.get("tool_calls"):
- # 收集这个 assistant 消息的所有 tool_call_id
- expected_ids = {tc.get("id") for tc in msg.get("tool_calls", [])}
- # 检查后续消息是否有对应的 tool response
- j = i + 1
- found_ids = set()
- while j < len(conv) and conv[j].get("role") == "tool":
- found_ids.add(conv[j].get("tool_call_id"))
- j += 1
- # 只有所有 tool_call_id 都有对应 response 才保留
- if expected_ids == found_ids:
- clean.append(msg)
- # 添加对应的 tool response
- for k in range(i + 1, j):
- clean.append(conv[k])
- i = j
- else:
- # 不完整,跳过这组
- i = j
- elif msg.get("role") == "tool":
- # 孤立的 tool response,跳过
- i += 1
- else:
- clean.append(msg)
- i += 1
- messages.extend(clean)
- return messages
- def _append_message(self, conv_key: str, role: str, content: Any, **extra):
- msg = {"role": role, "content": content, **extra}
- self.conversations[conv_key].append(msg)
- if len(self.conversations[conv_key]) > MAX_HISTORY:
- self.conversations[conv_key] = self.conversations[conv_key][-MAX_HISTORY:]
- def _extract_content(self, event: FeishuMessageEvent) -> Optional[Any]:
- """从飞书消息中提取内容(文本/图片/文件),返回 OpenAI 多模态格式"""
- if event.content_type == "text":
- try:
- parsed = json.loads(event.content)
- return parsed.get("text", event.content)
- except (json.JSONDecodeError, TypeError):
- return event.content
- elif event.content_type == "image":
- # 下载图片并转成 base64
- try:
- content_dict = json.loads(event.content)
- image_key = content_dict.get("image_key")
- if image_key and event.message_id:
- img_bytes = self.client.download_message_resource(
- message_id=event.message_id,
- file_key=image_key,
- resource_type="image"
- )
- b64_str = base64.b64encode(img_bytes).decode('utf-8')
- # 返回 OpenAI 多模态格式
- return [
- {"type": "text", "text": "[用户发送了一张图片]"},
- {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64_str}"}}
- ]
- except Exception as e:
- logger.error(f"下载图片失败: {e}")
- return "[图片下载失败]"
- elif event.content_type == "file":
- # 下载文件到本地
- try:
- content_dict = json.loads(event.content)
- file_key = content_dict.get("file_key")
- file_name = content_dict.get("file_name", "unknown_file")
- if file_key and event.message_id:
- file_bytes = self.client.download_message_resource(
- message_id=event.message_id,
- file_key=file_key,
- resource_type="file"
- )
- # 保存到项目根目录的 downloads 文件夹
- download_dir = os.path.join(PROJECT_ROOT, "downloads")
- os.makedirs(download_dir, exist_ok=True)
- save_path = os.path.join(download_dir, file_name)
- with open(save_path, "wb") as f:
- f.write(file_bytes)
- return f"[用户发送了文件: {file_name},已保存到 {save_path}]"
- except Exception as e:
- logger.error(f"下载文件失败: {e}")
- return "[文件下载失败]"
- return None
- def _load_history_from_disk(self, contact_name: str, conv_key: str):
- if conv_key in self._history_loaded:
- return
- self._history_loaded.add(conv_key)
- history = load_chat_history(contact_name)
- for msg in history[-MAX_HISTORY:]:
- role = msg.get("role", "user")
- content = msg.get("content", "")
- if isinstance(content, list):
- text_parts = [b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text"]
- content = "\n".join(text_parts)
- if isinstance(content, str) and content.strip():
- self.conversations[conv_key].append({"role": role, "content": content})
- def _save_to_disk(self, contact_name: str, conv_key: str):
- # 保存完整对话历史,包括工具调用和结果
- saveable = []
- for m in self.conversations[conv_key]:
- role = m.get("role")
- content = m.get("content")
- # 保存所有消息类型
- if role in ("user", "assistant", "tool"):
- msg = {"role": role}
- # 处理 content
- if isinstance(content, str):
- msg["content"] = content
- elif isinstance(content, list):
- # 多模态消息:提取文本部分保存
- text_parts = [b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text"]
- if text_parts:
- msg["content"] = "\n".join(text_parts)
- else:
- msg["content"] = "[多模态内容]"
- else:
- msg["content"] = str(content) if content else ""
- # 保存 tool_calls(如果有)
- if "tool_calls" in m:
- msg["tool_calls"] = m["tool_calls"]
- # 保存 tool_call_id(如果有)
- if "tool_call_id" in m:
- msg["tool_call_id"] = m["tool_call_id"]
- saveable.append(msg)
- save_chat_history(contact_name, saveable)
- def handle_message(self, event: FeishuMessageEvent):
- global _current_chat_id
- contact = get_contact_by_id(event.sender_open_id) or get_contact_by_id(event.chat_id)
- if not contact:
- logger.debug(f"忽略非白名单消息: {event.sender_open_id}")
- return
- sender_name = contact.get("name", "未知")
- if sender_name not in ALLOWED_CONTACTS:
- logger.debug(f"忽略非白名单联系人: {sender_name}")
- return
- user_content = self._extract_content(event)
- if not user_content:
- logger.info(f"[{sender_name}] 发送了不支持的消息类型,跳过")
- return
- # 记录消息类型
- if isinstance(user_content, str):
- logger.info(f"收到 [{sender_name}]: {user_content[:80]}")
- else:
- logger.info(f"收到 [{sender_name}]: [多模态消息]")
- conv_key = self._get_conversation_key(event)
- _current_chat_id = event.chat_id # 设置全局变量供工具使用
- self._load_history_from_disk(sender_name, conv_key)
- self._append_message(conv_key, "user", user_content)
- self._save_to_disk(sender_name, conv_key)
- future = asyncio.run_coroutine_threadsafe(
- self._generate_and_reply(conv_key, event, sender_name),
- self.loop,
- )
- future.add_done_callback(self._on_reply_done)
- async def _generate_and_reply(self, conv_key: str, event: FeishuMessageEvent, sender_name: str):
- """调用 Qwen LLM,支持多轮工具调用循环"""
- try:
- last_tool_calls = [] # 记录最近的工具调用,用于检测循环
- for round_i in range(MAX_TOOL_ROUNDS):
- messages = self._build_messages(conv_key)
- result = await qwen_llm_call(
- messages=messages,
- model=MODEL,
- tools=TOOLS,
- temperature=0.7,
- max_tokens=4096,
- )
- tool_calls = result.get("tool_calls")
- if not tool_calls:
- # 没有工具调用,直接回复
- reply_text = result.get("content", "").strip()
- if not reply_text:
- reply_text = "(抱歉,我暂时无法生成回复)"
- self._append_message(conv_key, "assistant", reply_text)
- self._save_to_disk(sender_name, conv_key)
- self.client.send_message(
- to=event.chat_id,
- text=reply_text,
- receive_id_type=ReceiveIdType.CHAT_ID,
- )
- tokens_in = result.get("prompt_tokens", 0)
- tokens_out = result.get("completion_tokens", 0)
- cost = result.get("cost", 0)
- logger.info(
- f"回复 [{sender_name}]: {reply_text[:80]}... "
- f"(tokens: {tokens_in}+{tokens_out}, cost: ¥{cost:.4f})"
- )
- return
- # 检测循环:如果连续 2 次调用相同工具(仅比较工具名),强制退出
- current_call_names = [tc.get("function", {}).get("name") for tc in tool_calls]
- last_tool_calls.append(current_call_names)
- if len(last_tool_calls) >= 2 and last_tool_calls[-1] == last_tool_calls[-2]:
- logger.warning(f"[{sender_name}] 检测到工具调用循环: {current_call_names}")
- self._append_message(conv_key, "assistant", "(检测到重复调用,已停止)")
- self._save_to_disk(sender_name, conv_key)
- self.client.send_message(
- to=event.chat_id,
- text="⚠️ 检测到重复调用相同工具,已停止。请换一个方式或直接告诉我结果。",
- receive_id_type=ReceiveIdType.CHAT_ID,
- )
- return
- # 有工具调用:追加 assistant 消息(含 tool_calls),然后执行
- assistant_msg = {"role": "assistant", "content": result.get("content", "") or ""}
- assistant_msg["tool_calls"] = tool_calls
- self.conversations[conv_key].append(assistant_msg)
- for tc in tool_calls:
- func = tc.get("function", {})
- tool_name = func.get("name", "")
- try:
- tool_args = json.loads(func.get("arguments", "{}"))
- except json.JSONDecodeError:
- tool_args = {}
- logger.info(f"[{sender_name}] 调用工具: {tool_name}({tool_args})")
- # 在线程池中执行工具,避免阻塞 event loop
- loop = asyncio.get_event_loop()
- tool_result = await loop.run_in_executor(
- None, execute_tool, tool_name, tool_args
- )
- # 截断过长的工具结果
- if len(tool_result) > 4000:
- tool_result = tool_result[:4000] + "\n... (结果已截断)"
- self.conversations[conv_key].append({
- "role": "tool",
- "tool_call_id": tc.get("id", ""),
- "content": tool_result,
- })
- # 超过最大轮次
- self._append_message(conv_key, "assistant", "(工具调用轮次过多,已停止)")
- self._save_to_disk(sender_name, conv_key)
- self.client.send_message(
- to=event.chat_id,
- text="⚠️ 工具调用轮次过多,已停止",
- receive_id_type=ReceiveIdType.CHAT_ID,
- )
- except Exception as e:
- logger.error(f"生成回复失败: {e}", exc_info=True)
- try:
- self.client.send_message(
- to=event.chat_id,
- text=f"⚠️ 生成回复时出错: {type(e).__name__}",
- receive_id_type=ReceiveIdType.CHAT_ID,
- )
- except Exception:
- logger.error("发送错误消息也失败了", exc_info=True)
- def _on_reply_done(self, future):
- exc = future.exception()
- if exc:
- logger.error(f"回复任务异常: {exc}", exc_info=exc)
- def start(self):
- logger.info(f"启动飞书对话 Agent (model={MODEL})")
- logger.info("等待飞书消息... 按 Ctrl+C 退出")
- try:
- self.client.start_websocket(
- on_message=self.handle_message,
- blocking=True,
- )
- except KeyboardInterrupt:
- logger.info("Agent 已停止")
- finally:
- self.loop.call_soon_threadsafe(self.loop.stop)
- if __name__ == "__main__":
- agent = FeishuAgent()
- agent.start()
|