""" 内容寻找 Agent 使用示例: python run.py """ import asyncio import logging import sys import os from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent import ( AgentRunner, RunConfig, FileSystemTraceStore, Trace, Message, ) from agent.llm import create_openrouter_llm_call from agent.llm.prompts import SimplePrompt # 导入工具(确保工具被注册) from tools import ( douyin_search, douyin_user_videos, get_content_fans_portrait, get_account_fans_portrait, ) # 配置日志 log_dir = Path(__file__).parent / '.cache' log_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_dir / 'agent.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) async def generate_fallback_output(store: FileSystemTraceStore, trace_id: str): """ 当任务未正常输出时,从 trace 中提取数据并生成兜底输出 """ try: # 读取所有消息 messages_dir = Path(store.base_path) / trace_id / "messages" if not messages_dir.exists(): print("无法生成摘要:找不到消息目录") return # 提取搜索结果和画像数据 search_results = [] portrait_data = {} import json import re for msg_file in sorted(messages_dir.glob("*.json")): with open(msg_file, 'r', encoding='utf-8') as f: msg = json.load(f) # 提取搜索结果(从文本结果中解析) if msg.get("role") == "tool" and msg.get("content", {}).get("tool_name") == "douyin_search": result_text = msg.get("content", {}).get("result", "") # 解析每条搜索结果 lines = result_text.split("\n") current_item = {} for line in lines: line = line.strip() if not line: if current_item.get("aweme_id"): if current_item["aweme_id"] not in [r["aweme_id"] for r in search_results]: search_results.append(current_item) current_item = {} continue # 解析标题行(以数字开头) if re.match(r'^\d+\.', line): current_item["desc"] = line.split(".", 1)[1].strip()[:100] # 解析 ID elif line.startswith("ID:"): current_item["aweme_id"] = line.split("ID:")[1].strip() # 解析作者 elif line.startswith("作者:"): author_name = line.split("作者:")[1].strip() current_item["author"] = {"nickname": author_name} # 解析 sec_uid elif line.startswith("sec_uid:"): sec_uid = line.split("sec_uid:")[1].strip() if "author" not in current_item: current_item["author"] = {} current_item["author"]["sec_uid"] = sec_uid # 解析数据 elif line.startswith("数据:"): stats_text = line.split("数据:")[1].strip() stats = {} # 解析点赞数 if "点赞" in stats_text: digg_match = re.search(r'点赞\s+([\d,]+)', stats_text) if digg_match: stats["digg_count"] = int(digg_match.group(1).replace(",", "")) # 解析评论数 if "评论" in stats_text: comment_match = re.search(r'评论\s+([\d,]+)', stats_text) if comment_match: stats["comment_count"] = int(comment_match.group(1).replace(",", "")) # 解析分享数 if "分享" in stats_text: share_match = re.search(r'分享\s+([\d,]+)', stats_text) if share_match: stats["share_count"] = int(share_match.group(1).replace(",", "")) current_item["statistics"] = stats # 添加最后一条 if current_item.get("aweme_id"): if current_item["aweme_id"] not in [r["aweme_id"] for r in search_results]: search_results.append(current_item) # 提取画像数据 elif msg.get("role") == "tool": tool_name = msg.get("content", {}).get("tool_name", "") result_text = msg.get("content", {}).get("result", "") if tool_name in ["get_content_fans_portrait", "get_account_fans_portrait"]: # 解析画像数据 content_id = None age_50_plus = None tgi = None # 从结果文本中提取 ID if "内容 " in result_text: parts = result_text.split("内容 ")[1].split(" ")[0] content_id = parts elif "账号 " in result_text: parts = result_text.split("账号 ")[1].split(" ")[0] content_id = parts # 提取50岁以上数据(格式:50-: 48.35% (偏好度: 210.05)) if "【年龄】分布" in result_text: lines = result_text.split("\n") for line in lines: if "50-:" in line: # 解析: 50-: 48.35% (偏好度: 210.05) parts = line.split("50-:")[1].strip() if "%" in parts: age_50_plus = parts.split("%")[0].strip() if "偏好度:" in parts: tgi_part = parts.split("偏好度:")[1].strip() tgi = tgi_part.replace(")", "").strip() break if content_id and age_50_plus: portrait_data[content_id] = { "age_50_plus": age_50_plus, "tgi": tgi, "source": "内容点赞画像" if tool_name == "get_content_fans_portrait" else "账号粉丝画像" } # 生成输出 print("\n" + "="*60) print("📊 任务执行摘要(兜底输出)") print("="*60) print(f"\n搜索情况:找到 {len(search_results)} 条候选内容") print(f"画像获取:获取了 {len(portrait_data)} 条画像数据") # 筛选有画像且符合要求的内容 matched_results = [] for result in search_results: aweme_id = result["aweme_id"] author_id = result["author"].get("sec_uid", "") # 查找画像数据(优先内容画像,其次账号画像) portrait = portrait_data.get(aweme_id) or portrait_data.get(author_id) if portrait and portrait.get("age_50_plus"): try: age_ratio = float(portrait["age_50_plus"]) if age_ratio >= 20: # 50岁以上占比>=20% matched_results.append({ **result, "portrait": portrait }) except: pass # 按50岁以上占比排序 matched_results.sort(key=lambda x: float(x["portrait"]["age_50_plus"]), reverse=True) # 输出推荐结果 print(f"\n符合要求:{len(matched_results)} 条内容(50岁以上占比>=20%)") print("\n" + "="*60) print("🎯 推荐结果") print("="*60) for i, result in enumerate(matched_results[:10], 1): aweme_id = result["aweme_id"] desc = result["desc"] author = result["author"] stats = result["statistics"] portrait = result["portrait"] print(f"\n{i}. {desc}") print(f" 链接: https://www.douyin.com/video/{aweme_id}") print(f" 作者: {author.get('nickname', '未知')}") print(f" 热度: 👍 {stats.get('digg_count', 0):,} | 💬 {stats.get('comment_count', 0):,} | 🔄 {stats.get('share_count', 0):,}") print(f" 画像: 50岁以上 {portrait['age_50_plus']}% (tgi: {portrait['tgi']}) - {portrait['source']}") print("\n" + "="*60) print(f"✅ 已为您找到 {min(len(matched_results), 10)} 条推荐视频") print("="*60) except Exception as e: logger.error(f"生成兜底输出失败: {e}", exc_info=True) print(f"\n生成摘要失败: {e}") async def main(): print("\n" + "=" * 60) print("内容寻找 Agent") print("=" * 60) print("开始执行...\n") # 加载 prompt prompt_path = Path(__file__).parent / "content_finder.prompt" prompt = SimplePrompt(prompt_path) # 构建消息 messages = prompt.build_messages() # 初始化 api_key = os.getenv("OPEN_ROUTER_API_KEY") if not api_key: raise ValueError("OPEN_ROUTER_API_KEY 未设置,请在 .env 文件中配置") model = os.getenv("MODEL", f"anthropic/claude-{prompt.config.get('model', 'sonnet-4.6')}") temperature = float(prompt.config.get("temperature", 0.3)) max_iterations = int(os.getenv("MAX_ITERATIONS", "30")) trace_dir = os.getenv("TRACE_DIR", ".cache/traces") skills_dir = str(Path(__file__).parent / "skills") Path(trace_dir).mkdir(parents=True, exist_ok=True) store = FileSystemTraceStore(base_path=trace_dir) # 限制工具范围:只使用抖音相关的4个工具 allowed_tools = [ "douyin_search", "douyin_user_videos", "get_content_fans_portrait", "get_account_fans_portrait", ] runner = AgentRunner( llm_call=create_openrouter_llm_call(model=model), trace_store=store, skills_dir=skills_dir, ) config = RunConfig( model=model, temperature=temperature, max_iterations=max_iterations, tools=allowed_tools, # 限制工具范围 extra_llm_params={"max_tokens": 8192}, # 增加输出 token 限制,避免被截断 ) # 执行 trace_id = None has_final_output = False try: async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): trace_id = item.trace_id if item.status == "completed": print(f"\n[完成] trace_id={item.trace_id}") # 检查是否有最终输出 if not has_final_output: print("\n⚠️ 检测到任务未完整输出,正在生成摘要...") await generate_fallback_output(store, item.trace_id) elif item.status == "failed": print(f"\n[失败] {item.error_message}") elif isinstance(item, Message): if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") # 输出文本内容 if text: # 检测是否包含最终推荐结果 if "推荐结果" in text or "推荐内容" in text or "🎯" in text: has_final_output = True # 如果文本很长(>500字符)且包含推荐结果标记,输出完整内容 if len(text) > 500 and ("推荐结果" in text or "推荐内容" in text or "🎯" in text): print(f"\n{text}") # 如果有工具调用且文本较短,只输出摘要 elif tool_calls and len(text) > 100: print(f"[思考] {text[:100]}...") # 其他情况输出完整文本 else: print(f"\n{text}") # 输出工具调用信息 if tool_calls: for tc in tool_calls: tool_name = tc.get("function", {}).get("name", "unknown") # 跳过 goal 工具的输出,减少噪音 if tool_name != "goal": print(f"[工具] {tool_name}") elif isinstance(content, str) and content: print(f"\n{content}") elif item.role == "tool": content = item.content if isinstance(content, dict): tool_name = content.get("tool_name", "unknown") print(f"[结果] {tool_name} ✓") except KeyboardInterrupt: print("\n用户中断") except Exception as e: logger.error(f"执行失败: {e}", exc_info=True) print(f"\n执行失败: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())