| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- """
- 内容寻找 Agent
- 使用示例:
- python run.py
- """
- import asyncio
- import logging
- import sys
- import os
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent import (
- AgentRunner,
- RunConfig,
- FileSystemTraceStore,
- Trace,
- Message,
- )
- from agent.llm import create_openrouter_llm_call
- from agent.llm.prompts import SimplePrompt
- # 导入工具(确保工具被注册)
- from tools import (
- douyin_search,
- douyin_user_videos,
- get_content_fans_portrait,
- get_account_fans_portrait,
- )
- # 配置日志
- log_dir = Path(__file__).parent / '.cache'
- log_dir.mkdir(exist_ok=True)
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler(log_dir / 'agent.log'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- async def generate_fallback_output(store: FileSystemTraceStore, trace_id: str):
- """
- 当任务未正常输出时,从 trace 中提取数据并生成兜底输出
- """
- try:
- # 读取所有消息
- messages_dir = Path(store.base_path) / trace_id / "messages"
- if not messages_dir.exists():
- print("无法生成摘要:找不到消息目录")
- return
- # 提取搜索结果和画像数据
- search_results = []
- portrait_data = {}
- import json
- import re
- for msg_file in sorted(messages_dir.glob("*.json")):
- with open(msg_file, 'r', encoding='utf-8') as f:
- msg = json.load(f)
- # 提取搜索结果(从文本结果中解析)
- if msg.get("role") == "tool" and msg.get("content", {}).get("tool_name") == "douyin_search":
- result_text = msg.get("content", {}).get("result", "")
- # 解析每条搜索结果
- lines = result_text.split("\n")
- current_item = {}
- for line in lines:
- line = line.strip()
- if not line:
- if current_item.get("aweme_id"):
- if current_item["aweme_id"] not in [r["aweme_id"] for r in search_results]:
- search_results.append(current_item)
- current_item = {}
- continue
- # 解析标题行(以数字开头)
- if re.match(r'^\d+\.', line):
- current_item["desc"] = line.split(".", 1)[1].strip()[:100]
- # 解析 ID
- elif line.startswith("ID:"):
- current_item["aweme_id"] = line.split("ID:")[1].strip()
- # 解析作者
- elif line.startswith("作者:"):
- author_name = line.split("作者:")[1].strip()
- current_item["author"] = {"nickname": author_name}
- # 解析 sec_uid
- elif line.startswith("sec_uid:"):
- sec_uid = line.split("sec_uid:")[1].strip()
- if "author" not in current_item:
- current_item["author"] = {}
- current_item["author"]["sec_uid"] = sec_uid
- # 解析数据
- elif line.startswith("数据:"):
- stats_text = line.split("数据:")[1].strip()
- stats = {}
- # 解析点赞数
- if "点赞" in stats_text:
- digg_match = re.search(r'点赞\s+([\d,]+)', stats_text)
- if digg_match:
- stats["digg_count"] = int(digg_match.group(1).replace(",", ""))
- # 解析评论数
- if "评论" in stats_text:
- comment_match = re.search(r'评论\s+([\d,]+)', stats_text)
- if comment_match:
- stats["comment_count"] = int(comment_match.group(1).replace(",", ""))
- # 解析分享数
- if "分享" in stats_text:
- share_match = re.search(r'分享\s+([\d,]+)', stats_text)
- if share_match:
- stats["share_count"] = int(share_match.group(1).replace(",", ""))
- current_item["statistics"] = stats
- # 添加最后一条
- if current_item.get("aweme_id"):
- if current_item["aweme_id"] not in [r["aweme_id"] for r in search_results]:
- search_results.append(current_item)
- # 提取画像数据
- elif msg.get("role") == "tool":
- tool_name = msg.get("content", {}).get("tool_name", "")
- result_text = msg.get("content", {}).get("result", "")
- if tool_name in ["get_content_fans_portrait", "get_account_fans_portrait"]:
- # 解析画像数据
- content_id = None
- age_50_plus = None
- tgi = None
- # 从结果文本中提取 ID
- if "内容 " in result_text:
- parts = result_text.split("内容 ")[1].split(" ")[0]
- content_id = parts
- elif "账号 " in result_text:
- parts = result_text.split("账号 ")[1].split(" ")[0]
- content_id = parts
- # 提取50岁以上数据(格式:50-: 48.35% (偏好度: 210.05))
- if "【年龄】分布" in result_text:
- lines = result_text.split("\n")
- for line in lines:
- if "50-:" in line:
- # 解析: 50-: 48.35% (偏好度: 210.05)
- parts = line.split("50-:")[1].strip()
- if "%" in parts:
- age_50_plus = parts.split("%")[0].strip()
- if "偏好度:" in parts:
- tgi_part = parts.split("偏好度:")[1].strip()
- tgi = tgi_part.replace(")", "").strip()
- break
- if content_id and age_50_plus:
- portrait_data[content_id] = {
- "age_50_plus": age_50_plus,
- "tgi": tgi,
- "source": "内容点赞画像" if tool_name == "get_content_fans_portrait" else "账号粉丝画像"
- }
- # 生成输出
- print("\n" + "="*60)
- print("📊 任务执行摘要(兜底输出)")
- print("="*60)
- print(f"\n搜索情况:找到 {len(search_results)} 条候选内容")
- print(f"画像获取:获取了 {len(portrait_data)} 条画像数据")
- # 筛选有画像且符合要求的内容
- matched_results = []
- for result in search_results:
- aweme_id = result["aweme_id"]
- author_id = result["author"].get("sec_uid", "")
- # 查找画像数据(优先内容画像,其次账号画像)
- portrait = portrait_data.get(aweme_id) or portrait_data.get(author_id)
- if portrait and portrait.get("age_50_plus"):
- try:
- age_ratio = float(portrait["age_50_plus"])
- if age_ratio >= 20: # 50岁以上占比>=20%
- matched_results.append({
- **result,
- "portrait": portrait
- })
- except:
- pass
- # 按50岁以上占比排序
- matched_results.sort(key=lambda x: float(x["portrait"]["age_50_plus"]), reverse=True)
- # 输出推荐结果
- print(f"\n符合要求:{len(matched_results)} 条内容(50岁以上占比>=20%)")
- print("\n" + "="*60)
- print("🎯 推荐结果")
- print("="*60)
- for i, result in enumerate(matched_results[:10], 1):
- aweme_id = result["aweme_id"]
- desc = result["desc"]
- author = result["author"]
- stats = result["statistics"]
- portrait = result["portrait"]
- print(f"\n{i}. {desc}")
- print(f" 链接: https://www.douyin.com/video/{aweme_id}")
- print(f" 作者: {author.get('nickname', '未知')}")
- print(f" 热度: 👍 {stats.get('digg_count', 0):,} | 💬 {stats.get('comment_count', 0):,} | 🔄 {stats.get('share_count', 0):,}")
- print(f" 画像: 50岁以上 {portrait['age_50_plus']}% (tgi: {portrait['tgi']}) - {portrait['source']}")
- print("\n" + "="*60)
- print(f"✅ 已为您找到 {min(len(matched_results), 10)} 条推荐视频")
- print("="*60)
- except Exception as e:
- logger.error(f"生成兜底输出失败: {e}", exc_info=True)
- print(f"\n生成摘要失败: {e}")
- async def main():
- print("\n" + "=" * 60)
- print("内容寻找 Agent")
- print("=" * 60)
- print("开始执行...\n")
- # 加载 prompt
- prompt_path = Path(__file__).parent / "content_finder.prompt"
- prompt = SimplePrompt(prompt_path)
- # 构建消息
- messages = prompt.build_messages()
- # 初始化
- api_key = os.getenv("OPEN_ROUTER_API_KEY")
- if not api_key:
- raise ValueError("OPEN_ROUTER_API_KEY 未设置,请在 .env 文件中配置")
- model = os.getenv("MODEL", f"anthropic/claude-{prompt.config.get('model', 'sonnet-4.6')}")
- temperature = float(prompt.config.get("temperature", 0.3))
- max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
- trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
- skills_dir = str(Path(__file__).parent / "skills")
- Path(trace_dir).mkdir(parents=True, exist_ok=True)
- store = FileSystemTraceStore(base_path=trace_dir)
- # 限制工具范围:只使用抖音相关的4个工具
- allowed_tools = [
- "douyin_search",
- "douyin_user_videos",
- "get_content_fans_portrait",
- "get_account_fans_portrait",
- ]
- runner = AgentRunner(
- llm_call=create_openrouter_llm_call(model=model),
- trace_store=store,
- skills_dir=skills_dir,
- )
- config = RunConfig(
- model=model,
- temperature=temperature,
- max_iterations=max_iterations,
- tools=allowed_tools, # 限制工具范围
- extra_llm_params={"max_tokens": 8192}, # 增加输出 token 限制,避免被截断
- )
- # 执行
- trace_id = None
- has_final_output = False
- try:
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- trace_id = item.trace_id
- if item.status == "completed":
- print(f"\n[完成] trace_id={item.trace_id}")
- # 检查是否有最终输出
- if not has_final_output:
- print("\n⚠️ 检测到任务未完整输出,正在生成摘要...")
- await generate_fallback_output(store, item.trace_id)
- elif item.status == "failed":
- print(f"\n[失败] {item.error_message}")
- elif isinstance(item, Message):
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- # 输出文本内容
- if text:
- # 检测是否包含最终推荐结果
- if "推荐结果" in text or "推荐内容" in text or "🎯" in text:
- has_final_output = True
- # 如果文本很长(>500字符)且包含推荐结果标记,输出完整内容
- if len(text) > 500 and ("推荐结果" in text or "推荐内容" in text or "🎯" in text):
- print(f"\n{text}")
- # 如果有工具调用且文本较短,只输出摘要
- elif tool_calls and len(text) > 100:
- print(f"[思考] {text[:100]}...")
- # 其他情况输出完整文本
- else:
- print(f"\n{text}")
- # 输出工具调用信息
- if tool_calls:
- for tc in tool_calls:
- tool_name = tc.get("function", {}).get("name", "unknown")
- # 跳过 goal 工具的输出,减少噪音
- if tool_name != "goal":
- print(f"[工具] {tool_name}")
- elif isinstance(content, str) and content:
- print(f"\n{content}")
- elif item.role == "tool":
- content = item.content
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- print(f"[结果] {tool_name} ✓")
- except KeyboardInterrupt:
- print("\n用户中断")
- except Exception as e:
- logger.error(f"执行失败: {e}", exc_info=True)
- print(f"\n执行失败: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- asyncio.run(main())
|