jihuaqiang 16 часов назад
Родитель
Сommit
edb3a191a0

+ 10 - 0
agent/tools/models.py

@@ -6,6 +6,7 @@ Tool Models - 工具系统核心数据模型
 2. ToolContext: 工具执行上下文(依赖注入)
 """
 
+import json
 from dataclasses import dataclass, field
 from typing import Any, Dict, List, Optional, Protocol
 
@@ -44,6 +45,9 @@ class ToolResult:
 	# Token追踪(用于工具内部LLM调用)
 	tool_usage: Optional[Dict[str, Any]] = None  # 格式:{"model": "...", "prompt_tokens": 100, "completion_tokens": 50, "cost": 0.0}
 
+	# 默认 metadata 不会进入对话;对批量画像等工具设为 True,将 metadata 序列化后追加给 LLM
+	include_metadata_in_llm: bool = False
+
 	def to_llm_message(self, first_time: bool = True) -> str:
 		"""
 		转换为给 LLM 的消息
@@ -80,6 +84,12 @@ class ToolResult:
 		if self.attachments:
 			parts.append(f"\nAttachments: {', '.join(self.attachments)}")
 
+		if self.include_metadata_in_llm and self.metadata:
+			parts.append(
+				"## metadata (JSON)\n"
+				+ json.dumps(self.metadata, ensure_ascii=False)
+			)
+
 		return "\n\n".join(parts)
 
 

+ 7 - 6
examples/content_finder/content_finder.md

@@ -20,8 +20,9 @@ $system$
 - 抖音视频搜索(Tikhub):`douyin_search_tikhub`
 - 订阅账号作品搜索:`douyin_user_videos`
 - 数据库作者检索(按搜索词找历史优质作者):`find_authors_from_db`
-- 作品画像获取:`get_content_fans_portrait`
-- 作者画像获取:`get_account_fans_portrait`
+- 批量画像(**筛选阶段优先**:一次调用拉齐多条内容画像并按规则账号兜底):`batch_fetch_portraits`(参数 `candidates_json` 为 JSON 数组字符串)。工具返回的同一条 `tool` 消息正文末尾会附带 `## metadata (JSON)`(含 `results`);同时会写入 `{OUTPUT_DIR}/{trace_id}/batch_portraits.json`,上下文丢失时用 `read_file` 读取该文件即可恢复结构化结果。
+- 作品画像(单条):`get_content_fans_portrait`
+- 作者画像(单条兜底):`get_account_fans_portrait`
 - 过程记录:`think_and_plan`
 - 存储结果至数据库:`store_results_mysql`
 - 创建aigc计划:`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
@@ -91,8 +92,8 @@ $system$
 ```
 
 ### 筛选阶段必须按照 `content_filtering_strategy` 的步骤进行,对于**case出发**的搜索结果,满足6分即可输出不需要查看画像;其他结果按顺序查看画像
-1. 先调用 `get_content_fans_portrait`,检查 `metadata.has_portrait`
-2. 若 `has_portrait=False`,如果是 `douyin_search` 或 `douyin_search_tikhub` 获取到的视频,再调用 `get_account_fans_portrait` 兜底,如果是`douyin_user_videos`则不需要再次调用`get_account_fans_portrait`
+1. **优先**对本轮待画像的候选列表调用一次 `batch_fetch_portraits`:在 `candidates_json` 中传入数组,每项含 `aweme_id`、可选 `author_sec_uid`;来自 `douyin_user_videos` 的条目设 `try_account_fallback: false`,来自 `douyin_search` / `douyin_search_tikhub` 的条目设 `true`(默认)。根据返回的 `metadata.results` 逐条读取 `content.has_portrait` / `account.has_portrait` 与 `portrait_data`(逻辑同原单条工具)
+2. 仅当批量不适用(例如单条补拉)时,再使用 `get_content_fans_portrait`,检查 `metadata.has_portrait`;若 `has_portrait=False` 且来源为搜索类视频,再调用 `get_account_fans_portrait` 兜底;`douyin_user_videos` 来源不调用账号兜底
 补充:`douyin_search` 失败后再调用 `douyin_search_tikhub` 作为兜底。
 3. **不允许跳过画像获取直接输出**
 
@@ -111,8 +112,8 @@ $system$
 
 ### 2.筛选阶段是否按规则执行
 - 对于所有`基于case出发策略`的搜索结果,优先进行 **基础筛选**和**基于case出发策略筛选**,满足6分条件直接进入最终输出池,不需要调用画像数据。
-- 其他策略获取的视频或达不到6分的视频,是否都已调用过 `get_content_fans_portrait(aweme_id)`
-- 对其中 `metadata.has_portrait=False` 的条目,是否**在同一条目上**已调用 `get_account_fans_portrait(account_id=author.sec_uid)` 作为兜底?
+- 其他策略获取的视频或达不到6分的视频,是否已通过 **`batch_fetch_portraits`**(或等价的逐条 `get_content_fans_portrait`)获取内容画像
+- 对其中内容无有效画像、且来源为搜索类的条目,是否在批量结果中已包含账号兜底尝试,或已逐条调用 `get_account_fans_portrait`?(`douyin_user_videos` 来源不要求账号兜底。)
 - **禁止**:仅因内容侧无画像就跳过账号画像、直接把 `portrait_data` 当空或来源标为 `none` 而未尝试账号接口(除非两次调用均失败且已在理由中说明)。
 
 ### 3.输出、校验、入库顺序是否正确

+ 3 - 1
examples/content_finder/core.py

@@ -79,6 +79,7 @@ from tools import (
     douyin_user_videos,
     get_content_fans_portrait,
     get_account_fans_portrait,
+    batch_fetch_portraits,
     create_crawler_plan_by_douyin_content_id,
     create_crawler_plan_by_douyin_account_id,
     store_results_mysql,
@@ -91,7 +92,7 @@ from tools import (
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "体制内,近亲繁殖"
+DEFAULT_QUERY = "婆媳矛盾,反转式"
 DEFAULT_DEMAND_ID = 1
 
 
@@ -174,6 +175,7 @@ async def run_agent(
         "douyin_user_videos",
         "get_content_fans_portrait",
         "get_account_fans_portrait",
+        "batch_fetch_portraits",
         "find_authors_from_db",
         "store_results_mysql",
         "create_crawler_plan_by_douyin_content_id",

+ 1 - 0
examples/content_finder/render_log_html.py

@@ -68,6 +68,7 @@ TOOL_DESCRIPTION_MAP: dict[str, str] = {
     "douyin_user_videos": "通过账号/作者 sec_uid 获取其历史作品列表。",
     "get_content_fans_portrait": "获取视频点赞用户画像(热点宝),判断 metadata.has_portrait。",
     "get_account_fans_portrait": "获取作者粉丝画像(热点宝),用于内容画像缺失兜底。",
+    "batch_fetch_portraits": "批量获取多条内容画像并按规则账号兜底,一次工具调用返回 metadata.results。",
     "store_results_mysql": "将 output.json 写入 MySQL(作者表与内容表)。",
     "create_crawler_plan_by_douyin_content_id": "为入选视频生成 AIGC 爬取计划。",
     "create_crawler_plan_by_douyin_account_id": "为入选账号生成 AIGC 爬取计划。",

+ 1 - 0
examples/content_finder/server.py

@@ -244,6 +244,7 @@ async def create_task(request: TaskRequest):
                     "douyin_user_videos",
                     "get_content_fans_portrait",
                     "get_account_fans_portrait",
+                    "batch_fetch_portraits",
                     "store_results_mysql",
                     "exec_summary",
                 ]

+ 7 - 7
examples/content_finder/skills/content_filtering_strategy.md

@@ -53,19 +53,19 @@ description: 内容筛选方法论
 
 ### 阶段3:画像匹配筛选
 **分批处理**:先处理前 10 条候选内容,筛选后 >= M 则停止,不足再继续下一批。  
-**并行限制**:每次最多并行调用 3 个画像工具
-**停止条件**:已获取画像数量 >= M × 1.5 时,立即停止,进入下一阶段。
-不要无限循环获取画像,避免陷入"一直获取画像"的状态
+**批量画像(推荐)**:每一批内先整理本批待画像条目,**一次**调用 `batch_fetch_portraits(candidates_json=...)`(工具内顺序请求接口),用返回的 `metadata.results` 完成本批评估;避免对每条视频各打一轮「内容画像工具 + 可能账号画像工具」导致对话消息暴涨。单批条数不超过工具上限(30),超过则拆成多批多次 `batch_fetch_portraits`
+**停止条件**:已获取画像数量 >= M × 1.5 时,立即停止,进入下一阶段。  
+不要无限循环获取画像,避免陷入"一直获取画像"的状态
 
 #### 画像获取优先级
 
 **优先级 1:内容点赞用户画像**
-- 调用 `get_content_fans_portrait(content_id=aweme_id)`
-- `metadata.has_portrait=True` → 从 `metadata.portrait_data` 评估,标注来源 `content_like`
+- 批量:`batch_fetch_portraits` 返回中每条 `results[i].content.has_portrait` 与 `portrait_data`
+- 单条兜底:`get_content_fans_portrait(content_id=aweme_id)`,`metadata.has_portrait=True` → 从 `metadata.portrait_data` 评估,标注来源 `content_like`
 
 **优先级 2:账号粉丝画像(兜底)**
-- 如果 `has_portrait=False`,调用 `get_account_fans_portrait(account_id=author.sec_uid)`
-- 有画像则评估,标注来源 `account_fans`
+- 批量:同条目的 `results[i].account`(`attempted=true` 且 `has_portrait`);仅当该条 `try_account_fallback` 为 true(搜索来源)时工具才会请求账号接口;`douyin_user_videos` 来源在 `candidates_json` 中对该条设 `try_account_fallback: false`
+- 单条:若 `has_portrait=False`,调用 `get_account_fans_portrait(account_id=author.sec_uid)`;有画像则评估,标注来源 `account_fans`
 
 **优先级 3:无画像**
 - 两者均无画像,仅基于热度和相关性评估,标注来源 `none`

+ 14 - 30
examples/content_finder/skills/exec_summary_rows.md

@@ -7,27 +7,23 @@ description: 仅在需要写入 process_trace.json 时,用于记录最终输
 生成用于记录最终输出的每条视频的寻找过程的json
 
 ## 强约束(必须遵守)
-1. **只基于入选内容**:只能对 `output.json.contents` 中出现的 `aweme_id` 生成 rows;不得输出任何不在 contents 的视频(包括淘汰候选/搜索过程中的视频)。
+1. **视频选择**:只能对 `output.json.contents` 中出现的 `aweme_id` 生成 rows;不得输出任何不在 contents 的视频(包括淘汰候选/搜索过程中的视频)。
 2. **rows 数量必须等于 contents 数量**:一条入选内容必须对应且仅对应一行 row。
 3. **字段固定且统一**:每行 row 只允许包含下列 key(不得增删改名):
-   - `aweme_id`
-   - `title`
-   - `author_nickname`
-   - `strategy_type`
-   - `from_case_aweme_id`
-   - `from_case_point`
-   - `from_feature`
-   - `search_keyword`
-   - `channel`
-   - `decision_basis`
-   - `decision_notes`
-   - `input_features`
-4. **值使用中文枚举**:
-   - `strategy_type`: `"case出发策略"` / `"特征出发策略"`
-   - `channel`: `"抖音搜索"` / `"订阅账号"` / `"榜单"` / `"其他"`
-   - `decision_basis`: `"内容画像匹配"` / `"作者画像匹配"` / `"需求筛选"` / `"画像缺失"` / `"其他"`
+   - `aweme_id`:视频id
+   - `title`:视频标题
+   - `author_nickname`:作者名称。
+   - `strategy_type`:寻找策略。"case出发" / "特征出发"。
+   - `from_case_aweme_id`:case出发策略关联的内容id
+   - `from_case_point`: case出发策略 关联的灵感点。
+   - `from_feature`: 特征出发 关联的特征词。
+   - `search_keyword`: 搜索词,该内容从哪个搜索词来。
+   - `channel`:寻找方式 "抖音搜索" / "索引榜单搜索" / "垂类推荐流" / "订阅账号作品搜索"
+   - `decision_basis`:筛选的方式 "基于case出发策略筛选" / "内容点赞用户画像" / "账号粉丝画像" / "其他"
+   - `decision_notes`:筛选的理由
+   - `input_features`: Agent起始输入的特征词
+4. **值使用中文枚举**
 5. **input_features**:必须是 `list[str]`;默认从 `output.json.query` 按逗号拆分得到(兼容中文逗号)。
-6. **允许为空的字段**:`from_case_aweme_id/from_case_point/from_feature` 若无法确定可为空字符串,但不能缺 key。
 
 ## 依据
 - `output.json`(必须读取并以 `contents` 为准)
@@ -35,18 +31,6 @@ description: 仅在需要写入 process_trace.json 时,用于记录最终输
   - `contents[]`:每条入选内容,含 `aweme_id/title/author_nickname/reason/portrait_data.source` 等
 - `log.txt`:用于判断内容的 strategy_type(来自哪种策略)、from_case_point(来自哪个灵感点)、search_keyword(搜索词)与渠道等
 
-## 生成规则(建议优先级)
-对每个 `content in output.json.contents`:
-1. `aweme_id/title/author_nickname` 直接来自 content(必须与 contents 一致)
-2. `decision_notes`:优先用 `content.reason`(入选理由)
-3. `decision_basis`:
-   - 若 `content.portrait_data.source == "content_like"` → `"内容画像匹配"`
-   - 若 `== "account_fans"` → `"作者画像匹配"`
-   - 否则 `"其他"`
-4. `strategy_type/channel/search_keyword`:
-   - 在`log.txt`/上下文找明确来源,按事实填(且用上面的中文枚举)
-   - 否则 字段留空
-5. `from_case_* / from_feature`:能确定就填;不确定可空串。
 
 ## 输出格式(必须严格)
 只输出一个 JSON 对象(不要 Markdown、不要解释、不要多余文本):

+ 6 - 1
examples/content_finder/tools/__init__.py

@@ -5,7 +5,11 @@
 from .douyin_search import douyin_search
 from .douyin_search_tikhub import douyin_search_tikhub
 from .douyin_user_videos import douyin_user_videos
-from .hotspot_profile import get_content_fans_portrait, get_account_fans_portrait
+from .hotspot_profile import (
+    get_content_fans_portrait,
+    get_account_fans_portrait,
+    batch_fetch_portraits,
+)
 from .store_results_mysql import store_results_mysql
 from .aigc_platform_api import create_crawler_plan_by_douyin_content_id, create_crawler_plan_by_douyin_account_id
 from .think_and_plan import think_and_plan
@@ -19,6 +23,7 @@ __all__ = [
     "douyin_user_videos",
     "get_content_fans_portrait",
     "get_account_fans_portrait",
+    "batch_fetch_portraits",
     "store_results_mysql",
     "create_crawler_plan_by_douyin_content_id",
     "create_crawler_plan_by_douyin_account_id",

+ 1 - 1
examples/content_finder/tools/aigc_platform_api.py

@@ -25,7 +25,7 @@ def _log_aigc_return(label: str, params: Dict[str, Any], r: ToolResult) -> ToolR
     log_tool_call(label, params, format_tool_result_for_log(r))
     return r
 
-CAN_NOT_CREATE_PLAN = False
+CAN_NOT_CREATE_PLAN = True
 
 AIGC_BASE_URL = "https://aigc-api.aiddit.com"
 CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"

+ 540 - 295
examples/content_finder/tools/hotspot_profile.py

@@ -3,10 +3,12 @@
 
 调用内部爬虫服务获取账号/内容的粉丝画像。
 """
-import asyncio
+import json
 import logging
+import os
 import time
-from typing import Optional, Dict, Any, List, Tuple
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, TypedDict
 
 import requests
 
@@ -17,11 +19,47 @@ logger = logging.getLogger(__name__)
 
 _LABEL_ACCOUNT = "工具调用:get_account_fans_portrait -> 抖音账号粉丝画像(热点宝)"
 _LABEL_CONTENT = "工具调用:get_content_fans_portrait -> 内容点赞用户画像(热点宝)"
+_LABEL_BATCH = "工具调用:batch_fetch_portraits -> 批量获取内容/账号画像(热点宝)"
 
+BATCH_MAX_ITEMS = 30
+_BATCH_SNAPSHOT_NAME = "batch_portraits.json"
 
-def _log_return(label: str, params: Dict[str, Any], r: ToolResult) -> ToolResult:
-    log_tool_call(label, params, format_tool_result_for_log(r))
-    return r
+
+def _repo_root_from_this_file() -> Path:
+    # examples/content_finder/tools/hotspot_profile.py -> Agent 仓库根
+    return Path(__file__).resolve().parents[3]
+
+
+def _resolve_output_dir_path() -> Path:
+    raw = (os.getenv("OUTPUT_DIR") or ".cache/output").strip()
+    p = Path(raw).expanduser()
+    return p.resolve() if p.is_absolute() else (_repo_root_from_this_file() / p).resolve()
+
+
+def _persist_batch_portraits_json(
+    trace_id: Optional[str],
+    results: List[Dict[str, Any]],
+    count: int,
+) -> Optional[str]:
+    """将批量画像结果写入 OUTPUT_DIR/<trace_id>/batch_portraits.json,便于 read_file 与排障。"""
+    if not trace_id:
+        return None
+    try:
+        out_dir = _resolve_output_dir_path() / trace_id
+        out_dir.mkdir(parents=True, exist_ok=True)
+        path = out_dir / _BATCH_SNAPSHOT_NAME
+        path.write_text(
+            json.dumps(
+                {"trace_id": trace_id, "count": count, "results": results},
+                ensure_ascii=False,
+                indent=2,
+            ),
+            encoding="utf-8",
+        )
+        return str(path)
+    except OSError as e:
+        logger.warning("batch portrait snapshot write failed: %s", e)
+        return None
 
 
 ACCOUNT_FANS_PORTRAIT_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/account_fans_portrait"
@@ -29,6 +67,189 @@ CONTENT_FANS_PORTRAIT_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/re_di
 DEFAULT_TIMEOUT = 60.0
 
 
+class _PortraitOk(TypedDict):
+    output: str
+    has_portrait: bool
+    portrait_data: Dict[str, Any]
+    raw_data: Any
+
+
+def _log_return(label: str, params: Dict[str, Any], r: ToolResult) -> ToolResult:
+    log_tool_call(label, params, format_tool_result_for_log(r))
+    return r
+
+
+def _top_k(items: Dict[str, Any], k: int) -> List[Tuple[str, Any]]:
+    def percent_value(entry: Tuple[str, Any]) -> float:
+        metrics = entry[1] if isinstance(entry[1], dict) else {}
+        return metrics.get("percentage")
+
+    return sorted(items.items(), key=percent_value, reverse=True)[:k]
+
+
+def _format_portrait_summary(
+    header_line: str,
+    link_line: str,
+    portrait: Dict[str, Any],
+) -> str:
+    summary_lines = [header_line, link_line, ""]
+    for k, v in portrait.items():
+        if not isinstance(v, dict):
+            continue
+        if k in ("省份", "城市"):
+            summary_lines.append(f"【{k} TOP5】分布")
+            items = _top_k(v, 5)
+        else:
+            summary_lines.append(f"【{k}】分布")
+            items = v.items()
+
+        for name, metrics in items:
+            ratio = metrics.get("percentage")
+            tgi = metrics.get("preference")
+            summary_lines.append(f"  {name}: {ratio} (偏好度: {tgi})")
+        summary_lines.append("")
+    return "\n".join(summary_lines)
+
+
+def _validate_account_id(account_id: str) -> Optional[str]:
+    if not account_id or not isinstance(account_id, str):
+        return "account_id 参数无效:必须是非空字符串"
+    if not account_id.startswith("MS4wLjABAAAA"):
+        return (
+            f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,"
+            f"当前值: {account_id[:min(20, len(account_id))]}..."
+        )
+    return None
+
+
+def _validate_content_id(content_id: str) -> Optional[str]:
+    if not content_id or not isinstance(content_id, str):
+        return "content_id 参数无效:必须是非空字符串"
+    if not content_id.isdigit():
+        return f"content_id 格式错误:aweme_id 应该是纯数字,当前值: {content_id[:20]}..."
+    if len(content_id) < 15 or len(content_id) > 25:
+        return f"content_id 长度异常:期望 15-25 位数字,实际 {len(content_id)} 位"
+    return None
+
+
+def _dimension_flags(
+    need_province: bool,
+    need_city: bool,
+    need_city_level: bool,
+    need_gender: bool,
+    need_age: bool,
+    need_phone_brand: bool,
+    need_phone_price: bool,
+) -> Dict[str, bool]:
+    return {
+        "need_province": need_province,
+        "need_city": need_city,
+        "need_city_level": need_city_level,
+        "need_gender": need_gender,
+        "need_age": need_age,
+        "need_phone_brand": need_phone_brand,
+        "need_phone_price": need_phone_price,
+    }
+
+
+def _sync_fetch_account_portrait(
+    account_id: str,
+    flags: Dict[str, bool],
+    request_timeout: float,
+) -> Tuple[Optional[str], Optional[_PortraitOk]]:
+    err = _validate_account_id(account_id)
+    if err:
+        return err, None
+    payload = {"account_id": account_id, **flags}
+    try:
+        response = requests.post(
+            ACCOUNT_FANS_PORTRAIT_API,
+            json=payload,
+            headers={"Content-Type": "application/json"},
+            timeout=request_timeout,
+        )
+        response.raise_for_status()
+        data = response.json()
+    except requests.exceptions.HTTPError as e:
+        return f"HTTP {e.response.status_code}: {e.response.text}", None
+    except requests.exceptions.Timeout:
+        return f"请求超时({request_timeout}秒)", None
+    except requests.exceptions.RequestException as e:
+        return f"网络错误: {str(e)}", None
+    except Exception as e:
+        logger.error(
+            "account portrait request failed",
+            extra={"account_id": account_id, "error": str(e)},
+            exc_info=True,
+        )
+        return f"未知错误: {str(e)}", None
+
+    data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+    portrait = data_block.get("data", {}) if isinstance(data_block.get("data"), dict) else {}
+    header = f"账号 {account_id} 的粉丝画像"
+    link = (
+        f"画像链接:https://douhot.douyin.com/creator/detail?"
+        f"active_tab=creator_fans_portrait&creator_id={account_id}"
+    )
+    output = _format_portrait_summary(header, link, portrait)
+    has_valid = bool(portrait and any(isinstance(v, dict) and v for v in portrait.values()))
+    return None, _PortraitOk(
+        output=output,
+        has_portrait=has_valid,
+        portrait_data=portrait,
+        raw_data=data,
+    )
+
+
+def _sync_fetch_content_portrait(
+    content_id: str,
+    flags: Dict[str, bool],
+    request_timeout: float,
+) -> Tuple[Optional[str], Optional[_PortraitOk]]:
+    err = _validate_content_id(content_id)
+    if err:
+        return err, None
+    payload = {"content_id": content_id, **flags}
+    try:
+        response = requests.post(
+            CONTENT_FANS_PORTRAIT_API,
+            json=payload,
+            headers={"Content-Type": "application/json"},
+            timeout=request_timeout,
+        )
+        response.raise_for_status()
+        data = response.json()
+    except requests.exceptions.HTTPError as e:
+        return f"HTTP {e.response.status_code}: {e.response.text}", None
+    except requests.exceptions.Timeout:
+        return f"请求超时({request_timeout}秒)", None
+    except requests.exceptions.RequestException as e:
+        return f"网络错误: {str(e)}", None
+    except Exception as e:
+        logger.error(
+            "content portrait request failed",
+            extra={"content_id": content_id, "error": str(e)},
+            exc_info=True,
+        )
+        return f"未知错误: {str(e)}", None
+
+    data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+    portrait = data_block.get("data", {}) if isinstance(data_block.get("data"), dict) else {}
+    header = f"内容 {content_id} 的点赞用户画像"
+    link = (
+        f"画像链接:https://douhot.douyin.com/video/detail?"
+        f"active_tab=video_fans&video_id={content_id}"
+    )
+    output = _format_portrait_summary(header, link, portrait)
+    has_valid = bool(portrait and any(isinstance(v, dict) and v for v in portrait.values()))
+    return None, _PortraitOk(
+        output=output,
+        has_portrait=has_valid,
+        portrait_data=portrait,
+        raw_data=data,
+    )
+
+
 @tool(description="获取抖音账号粉丝画像(热点宝),支持选择画像维度")
 async def get_account_fans_portrait(
     account_id: str,
@@ -93,166 +314,55 @@ async def get_account_fans_portrait(
         "need_phone_price": need_phone_price,
         "timeout": timeout,
     }
-
-    # 验证 account_id 格式
-    if not account_id or not isinstance(account_id, str):
-        logger.error("get_account_fans_portrait invalid account_id", extra={"account_id": account_id})
-        return _log_return(
-            _LABEL_ACCOUNT,
-            call_params,
-            ToolResult(
-                title="账号粉丝画像获取失败",
-                output="",
-                error="account_id 参数无效:必须是非空字符串",
-            ),
-        )
-
-    if not account_id.startswith("MS4wLjABAAAA"):
-        logger.error("get_account_fans_portrait invalid sec_uid format", extra={"account_id": account_id})
+    flags = _dimension_flags(
+        need_province,
+        need_city,
+        need_city_level,
+        need_gender,
+        need_age,
+        need_phone_brand,
+        need_phone_price,
+    )
+    request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
+    err, ok = _sync_fetch_account_portrait(account_id, flags, request_timeout)
+    duration_ms = int((time.time() - start_time) * 1000)
+
+    if err:
+        logger.error("get_account_fans_portrait failed", extra={"account_id": account_id, "error": err})
         return _log_return(
             _LABEL_ACCOUNT,
             call_params,
             ToolResult(
                 title="账号粉丝画像获取失败",
                 output="",
-                error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...",
+                error=err,
             ),
         )
 
-    # if len(account_id) < 70 or len(account_id) > 90:
-    #     logger.error("get_account_fans_portrait invalid sec_uid length", extra={"account_id": account_id, "length": len(account_id)})
-    #     return ToolResult(
-    #         title="账号粉丝画像获取失败",
-    #         output="",
-    #         error=f"account_id 长度异常:期望 70-90 字符,实际 {len(account_id)} 字符。这可能是编造或截断的数据。",
-    #     )
-
-    try:
-        payload = {
+    assert ok is not None
+    logger.info(
+        "get_account_fans_portrait completed",
+        extra={
             "account_id": account_id,
-            "need_province": need_province,
-            "need_city": need_city,
-            "need_city_level": need_city_level,
-            "need_gender": need_gender,
-            "need_age": need_age,
-            "need_phone_brand": need_phone_brand,
-            "need_phone_price": need_phone_price,
-        }
-
-        request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
-
-        response = requests.post(
-            ACCOUNT_FANS_PORTRAIT_API,
-            json=payload,
-            headers={"Content-Type": "application/json"},
-            timeout=request_timeout
-        )
-        response.raise_for_status()
-        data = response.json()
-
-        data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
-        portrait = data_block.get("data", {}) if isinstance(data_block.get("data"), dict) else {}
-
-        # 格式化输出摘要
-        summary_lines = [f"账号 {account_id} 的粉丝画像"]
-        summary_lines.append(f"画像链接:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={account_id}")
-        summary_lines.append("")
-        for k, v in portrait.items():
-            if not isinstance(v, dict):
-                continue
-            if k in ("省份", "城市"):
-                summary_lines.append(f"【{k} TOP5】分布")
-                items = _top_k(v, 5)
-            else:
-                summary_lines.append(f"【{k}】分布")
-                items = v.items()
-
-            for name, metrics in items:
-                ratio = metrics.get("percentage")
-                tgi = metrics.get("preference")
-                summary_lines.append(f"  {name}: {ratio} (偏好度: {tgi})")
-            summary_lines.append("")
-
-        duration_ms = int((time.time() - start_time) * 1000)
-        has_valid_portrait = bool(portrait and any(
-            isinstance(v, dict) and v for v in portrait.values()
-        ))
-
-        logger.info(
-            "get_account_fans_portrait completed",
-            extra={
-                "account_id": account_id,
-                "has_portrait": has_valid_portrait,
-                "portrait_dimensions": list(portrait.keys()) if portrait else [],
-                "duration_ms": duration_ms
-            }
-        )
-
-        return _log_return(
-            _LABEL_ACCOUNT,
-            call_params,
-            ToolResult(
-                title=f"账号粉丝画像: {account_id}",
-                output="\n".join(summary_lines),
-                long_term_memory=f"Fetched fans portrait for account '{account_id}'",
-                metadata={
-                    "raw_data": data,
-                    "has_portrait": has_valid_portrait,
-                    "portrait_data": portrait,
-                },
-            ),
-        )
-    except requests.exceptions.HTTPError as e:
-        logger.error(
-            "get_account_fans_portrait HTTP error",
-            extra={
-                "account_id": account_id,
-                "status_code": e.response.status_code,
-                "error": str(e)
-            }
-        )
-        return _log_return(
-            _LABEL_ACCOUNT,
-            call_params,
-            ToolResult(
-                title="账号粉丝画像获取失败",
-                output="",
-                error=f"HTTP {e.response.status_code}: {e.response.text}",
-            ),
-        )
-    except requests.exceptions.Timeout:
-        logger.error("get_account_fans_portrait timeout", extra={"account_id": account_id, "timeout": request_timeout})
-        return _log_return(
-            _LABEL_ACCOUNT,
-            call_params,
-            ToolResult(
-                title="账号粉丝画像获取失败",
-                output="",
-                error=f"请求超时({request_timeout}秒)",
-            ),
-        )
-    except requests.exceptions.RequestException as e:
-        logger.error("get_account_fans_portrait network error", extra={"account_id": account_id, "error": str(e)})
-        return _log_return(
-            _LABEL_ACCOUNT,
-            call_params,
-            ToolResult(
-                title="账号粉丝画像获取失败",
-                output="",
-                error=f"网络错误: {str(e)}",
-            ),
-        )
-    except Exception as e:
-        logger.error("get_account_fans_portrait unexpected error", extra={"account_id": account_id, "error": str(e)}, exc_info=True)
-        return _log_return(
-            _LABEL_ACCOUNT,
-            call_params,
-            ToolResult(
-                title="账号粉丝画像获取失败",
-                output="",
-                error=f"未知错误: {str(e)}",
-            ),
-        )
+            "has_portrait": ok["has_portrait"],
+            "portrait_dimensions": list(ok["portrait_data"].keys()) if ok["portrait_data"] else [],
+            "duration_ms": duration_ms,
+        },
+    )
+    return _log_return(
+        _LABEL_ACCOUNT,
+        call_params,
+        ToolResult(
+            title=f"账号粉丝画像: {account_id}",
+            output=ok["output"],
+            long_term_memory=f"Fetched fans portrait for account '{account_id}'",
+            metadata={
+                "raw_data": ok["raw_data"],
+                "has_portrait": ok["has_portrait"],
+                "portrait_data": ok["portrait_data"],
+            },
+        ),
+    )
 
 
 @tool(description="获取抖音内容点赞用户画像(热点宝),支持选择画像维度")
@@ -320,176 +430,311 @@ async def get_content_fans_portrait(
         "need_phone_price": need_phone_price,
         "timeout": timeout,
     }
-
-    # 验证 content_id 格式
-    if not content_id or not isinstance(content_id, str):
-        logger.error("get_content_fans_portrait invalid content_id", extra={"content_id": content_id})
+    flags = _dimension_flags(
+        need_province,
+        need_city,
+        need_city_level,
+        need_gender,
+        need_age,
+        need_phone_brand,
+        need_phone_price,
+    )
+    request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
+    err, ok = _sync_fetch_content_portrait(content_id, flags, request_timeout)
+    duration_ms = int((time.time() - start_time) * 1000)
+
+    if err:
+        logger.error("get_content_fans_portrait failed", extra={"content_id": content_id, "error": err})
         return _log_return(
             _LABEL_CONTENT,
             call_params,
             ToolResult(
                 title="内容点赞用户画像获取失败",
                 output="",
-                error="content_id 参数无效:必须是非空字符串",
+                error=err,
             ),
         )
 
-    # aweme_id 应该是纯数字字符串,长度约 19 位
-    if not content_id.isdigit():
-        logger.error("get_content_fans_portrait invalid aweme_id format", extra={"content_id": content_id})
-        return _log_return(
-            _LABEL_CONTENT,
-            call_params,
-            ToolResult(
-                title="内容点赞用户画像获取失败",
-                output="",
-                error=f"content_id 格式错误:aweme_id 应该是纯数字,当前值: {content_id[:20]}...",
-            ),
-        )
-
-    if len(content_id) < 15 or len(content_id) > 25:
-        logger.error("get_content_fans_portrait invalid aweme_id length", extra={"content_id": content_id, "length": len(content_id)})
-        return _log_return(
-            _LABEL_CONTENT,
-            call_params,
-            ToolResult(
-                title="内容点赞用户画像获取失败",
-                output="",
-                error=f"content_id 长度异常:期望 15-25 位数字,实际 {len(content_id)} 位",
-            ),
-        )
-
-    try:
-        payload = {
+    assert ok is not None
+    logger.info(
+        "get_content_fans_portrait completed",
+        extra={
             "content_id": content_id,
-            "need_province": need_province,
-            "need_city": need_city,
-            "need_city_level": need_city_level,
-            "need_gender": need_gender,
-            "need_age": need_age,
-            "need_phone_brand": need_phone_brand,
-            "need_phone_price": need_phone_price,
-        }
-
-        request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
-
-        response = requests.post(
-            CONTENT_FANS_PORTRAIT_API,
-            json=payload,
-            headers={"Content-Type": "application/json"},
-            timeout=request_timeout
-        )
-        response.raise_for_status()
-        data = response.json()
-
-        data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
-        portrait = data_block.get("data", {}) if isinstance(data_block.get("data"), dict) else {}
+            "has_portrait": ok["has_portrait"],
+            "portrait_dimensions": list(ok["portrait_data"].keys()) if ok["portrait_data"] else [],
+            "duration_ms": duration_ms,
+        },
+    )
+    return _log_return(
+        _LABEL_CONTENT,
+        call_params,
+        ToolResult(
+            title=f"内容点赞用户画像: {content_id}",
+            output=ok["output"],
+            long_term_memory=f"Fetched fans portrait for content '{content_id}'",
+            metadata={
+                "raw_data": ok["raw_data"],
+                "has_portrait": ok["has_portrait"],
+                "portrait_data": ok["portrait_data"],
+            },
+        ),
+    )
+
+
+@tool(
+    description=(
+        "批量获取多条候选视频的画像:工具内依次请求内容点赞画像;"
+        "若无画像且允许兜底则再请求作者粉丝画像。一次调用返回所有条目,减少对话轮次。"
+        "完整结构化结果在同一条 tool 消息的 metadata JSON 中,并写入 OUTPUT_DIR/<trace_id>/batch_portraits.json。"
+    ),
+    hidden_params=["context"],
+)
+async def batch_fetch_portraits(
+    candidates_json: str,
+    need_province: bool = False,
+    need_city: bool = False,
+    need_city_level: bool = False,
+    need_gender: bool = False,
+    need_age: bool = True,
+    need_phone_brand: bool = False,
+    need_phone_price: bool = False,
+    timeout: Optional[float] = None,
+    context: Optional[Dict[str, Any]] = None,
+) -> ToolResult:
+    """
+    批量拉取内容画像并在规则允许时用账号画像兜底(单工具、多 HTTP 顺序请求)。
 
-        # 格式化输出摘要
-        summary_lines = [f"内容 {content_id} 的点赞用户画像"]
-        summary_lines.append(f"画像链接:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={content_id}")
-        summary_lines.append("")
+    Args:
+        candidates_json: JSON 数组字符串。每项为对象,字段:
+            - aweme_id (必填): 视频 id
+            - author_sec_uid (可选): 作者 sec_uid,兜底时需要
+            - try_account_fallback (可选,默认 true): 为 false 时不请求账号画像
+              (对应来自 douyin_user_videos 的条目,与单条工具规则一致)
+        need_* / timeout: 与各单条画像工具一致
 
-        for k, v in portrait.items():
-            if not isinstance(v, dict):
-                continue
-            if k in ("省份", "城市"):
-                summary_lines.append(f"【{k} TOP5】分布")
-                items = _top_k(v, 5)
-            else:
-                summary_lines.append(f"【{k}】分布")
-                items = v.items()
-
-            for name, metrics in items:
-                ratio = metrics.get("percentage")
-                tgi = metrics.get("preference")
-                summary_lines.append(f"  {name}: {ratio} (偏好度: {tgi})")
-            summary_lines.append("")
-
-        duration_ms = int((time.time() - start_time) * 1000)
-        has_valid_portrait = bool(portrait and any(
-            isinstance(v, dict) and v for v in portrait.values()
-        ))
-
-        logger.info(
-            "get_content_fans_portrait completed",
-            extra={
-                "content_id": content_id,
-                "has_portrait": has_valid_portrait,
-                "portrait_dimensions": list(portrait.keys()) if portrait else [],
-                "duration_ms": duration_ms
-            }
-        )
+    Returns:
+        ToolResult.output: 人类可读的分条摘要
+        metadata.results: 与 candidates 顺序一致的列表,每项含 content / account 子对象;
+            通过 ToolResult.include_metadata_in_llm 会进入本轮 tool 消息正文(JSON),无需从 log 猜测。
+        metadata.snapshot_path: 落盘文件绝对路径(若写入成功)
 
+    Note:
+        context 由 Runner 注入,含 trace_id,用于写入 batch_portraits.json。
+    """
+    call_params: Dict[str, Any] = {
+        "candidates_json": candidates_json[:2000] + ("..." if len(candidates_json) > 2000 else ""),
+        "need_age": need_age,
+        "timeout": timeout,
+    }
+    raw = (candidates_json or "").strip()
+    if not raw:
         return _log_return(
-            _LABEL_CONTENT,
+            _LABEL_BATCH,
             call_params,
-            ToolResult(
-                title=f"内容点赞用户画像: {content_id}",
-                output="\n".join(summary_lines),
-                long_term_memory=f"Fetched fans portrait for content '{content_id}'",
-                metadata={
-                    "raw_data": data,
-                    "has_portrait": has_valid_portrait,
-                    "portrait_data": portrait,
-                },
-            ),
+            ToolResult(title="批量画像失败", output="", error="candidates_json 为空"),
         )
-    except requests.exceptions.HTTPError as e:
-        logger.error(
-            "get_content_fans_portrait HTTP error",
-            extra={
-                "content_id": content_id,
-                "status_code": e.response.status_code,
-                "error": str(e)
-            }
-        )
-        return _log_return(
-            _LABEL_CONTENT,
-            call_params,
-            ToolResult(
-                title="内容点赞用户画像获取失败",
-                output="",
-                error=f"HTTP {e.response.status_code}: {e.response.text}",
-            ),
-        )
-    except requests.exceptions.Timeout:
-        logger.error("get_content_fans_portrait timeout", extra={"content_id": content_id, "timeout": request_timeout})
+    try:
+        parsed = json.loads(raw)
+    except json.JSONDecodeError as e:
         return _log_return(
-            _LABEL_CONTENT,
+            _LABEL_BATCH,
             call_params,
             ToolResult(
-                title="内容点赞用户画像获取失败",
+                title="批量画像失败",
                 output="",
-                error=f"请求超时({request_timeout}秒)",
+                error=f"candidates_json 不是合法 JSON: {e}",
             ),
         )
-    except requests.exceptions.RequestException as e:
-        logger.error("get_content_fans_portrait network error", extra={"content_id": content_id, "error": str(e)})
+    if not isinstance(parsed, list):
         return _log_return(
-            _LABEL_CONTENT,
+            _LABEL_BATCH,
             call_params,
-            ToolResult(
-                title="内容点赞用户画像获取失败",
-                output="",
-                error=f"网络错误: {str(e)}",
-            ),
+            ToolResult(title="批量画像失败", output="", error="candidates_json 必须是 JSON 数组"),
         )
-    except Exception as e:
-        logger.error("get_content_fans_portrait unexpected error", extra={"content_id": content_id, "error": str(e)}, exc_info=True)
+    if len(parsed) > BATCH_MAX_ITEMS:
         return _log_return(
-            _LABEL_CONTENT,
+            _LABEL_BATCH,
             call_params,
             ToolResult(
-                title="内容点赞用户画像获取失败",
+                title="批量画像失败",
                 output="",
-                error=f"未知错误: {str(e)}",
+                error=f"条目数超过上限 {BATCH_MAX_ITEMS},请分批调用",
             ),
         )
 
-def _top_k(items: Dict[str, Any], k: int) -> List[Tuple[str, Any]]:
-    def percent_value(entry: Tuple[str, Any]) -> float:
-        metrics = entry[1] if isinstance(entry[1], dict) else {}
-        return metrics.get("percentage")
+    flags = _dimension_flags(
+        need_province,
+        need_city,
+        need_city_level,
+        need_gender,
+        need_age,
+        need_phone_brand,
+        need_phone_price,
+    )
+    request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
+
+    results: List[Dict[str, Any]] = []
+    output_chunks: List[str] = []
+
+    for idx, entry in enumerate(parsed):
+        if not isinstance(entry, dict):
+            results.append(
+                {
+                    "aweme_id": None,
+                    "error": "条目不是对象",
+                    "content": None,
+                    "account": None,
+                }
+            )
+            output_chunks.append(f"[{idx}] 跳过:条目不是 JSON 对象")
+            continue
+
+        aweme_id = entry.get("aweme_id") or entry.get("content_id")
+        author_sec = entry.get("author_sec_uid") or entry.get("account_id")
+        try_fallback = entry.get("try_account_fallback", True)
+        if isinstance(try_fallback, str):
+            try_fallback = try_fallback.strip().lower() in ("1", "true", "yes")
+
+        if not aweme_id or not isinstance(aweme_id, str):
+            results.append(
+                {
+                    "aweme_id": aweme_id,
+                    "error": "缺少 aweme_id",
+                    "content": None,
+                    "account": None,
+                }
+            )
+            output_chunks.append(f"[{idx}] 跳过:缺少 aweme_id")
+            continue
+
+        item_result: Dict[str, Any] = {
+            "aweme_id": aweme_id,
+            "author_sec_uid": author_sec if isinstance(author_sec, str) else None,
+            "try_account_fallback": bool(try_fallback),
+            "content": None,
+            "account": None,
+            "error": None,
+        }
 
-    return sorted(items.items(), key=percent_value, reverse=True)[:k]
+        cerr, cok = _sync_fetch_content_portrait(aweme_id, flags, request_timeout)
+        if cerr:
+            item_result["content"] = {
+                "ok": False,
+                "error": cerr,
+                "has_portrait": False,
+                "portrait_data": {},
+            }
+        else:
+            assert cok is not None
+            item_result["content"] = {
+                "ok": True,
+                "error": None,
+                "has_portrait": cok["has_portrait"],
+                "portrait_data": cok["portrait_data"],
+                "output": cok["output"],
+            }
+
+        c_block = item_result["content"]
+        content_has = bool(c_block and c_block.get("has_portrait"))
+        need_account = bool(try_fallback) and not content_has
+        if need_account:
+            if not author_sec or not isinstance(author_sec, str):
+                item_result["account"] = {
+                    "attempted": False,
+                    "skipped_reason": "缺少 author_sec_uid,无法账号兜底",
+                    "has_portrait": False,
+                    "portrait_data": {},
+                }
+            else:
+                aerr, aok = _sync_fetch_account_portrait(author_sec, flags, request_timeout)
+                if aerr:
+                    item_result["account"] = {
+                        "attempted": True,
+                        "error": aerr,
+                        "has_portrait": False,
+                        "portrait_data": {},
+                    }
+                else:
+                    assert aok is not None
+                    item_result["account"] = {
+                        "attempted": True,
+                        "error": None,
+                        "has_portrait": aok["has_portrait"],
+                        "portrait_data": aok["portrait_data"],
+                        "output": aok["output"],
+                    }
+        else:
+            skip_reason = (
+                "try_account_fallback 为 false(如 douyin_user_videos 来源)"
+                if not try_fallback
+                else "内容侧已有有效画像,无需账号兜底"
+            )
+            item_result["account"] = {
+                "attempted": False,
+                "skipped_reason": skip_reason,
+                "has_portrait": False,
+                "portrait_data": {},
+            }
+
+        results.append(item_result)
+        # 压缩每条在 output 中的篇幅
+        c_part = item_result["content"] or {}
+        a_part = item_result["account"] or {}
+        line = (
+            f"[{idx}] aweme_id={aweme_id} "
+            f"content_has_portrait={c_part.get('has_portrait')} "
+            f"account_attempted={a_part.get('attempted')} "
+            f"account_has_portrait={a_part.get('has_portrait')}"
+        )
+        output_chunks.append(line)
+
+    full_text = "\n".join(output_chunks)
+    trace_id = None
+    if isinstance(context, dict):
+        tid = context.get("trace_id")
+        if isinstance(tid, str) and tid.strip():
+            trace_id = tid.strip()
+    snapshot_path = _persist_batch_portraits_json(trace_id, results, len(results))
+
+    out_display = (os.getenv("OUTPUT_DIR") or ".cache/output").strip()
+    rel_hint = (
+        f"{out_display}/{trace_id}/{_BATCH_SNAPSHOT_NAME}"
+        if trace_id
+        else f"{out_display}/<trace_id>/{_BATCH_SNAPSHOT_NAME}"
+    )
+    meta_hint = (
+        "\n\n本条 tool 消息在标题与摘要后附有 ## metadata (JSON),其中 results 含每条 "
+        "content/account 的 has_portrait 与 portrait_data;若上下文被压缩,可用 read_file 读取:"
+        f" {rel_hint}"
+        + (f"(本机路径: {snapshot_path})" if snapshot_path else "")
+    )
+    output_body = full_text + meta_hint
+
+    logger.info(
+        "batch_fetch_portraits completed",
+        extra={
+            "count": len(results),
+            "candidates": len(parsed),
+            "trace_id": trace_id,
+            "snapshot_path": snapshot_path,
+        },
+    )
+
+    meta: Dict[str, Any] = {
+        "results": results,
+        "count": len(results),
+    }
+    if snapshot_path:
+        meta["snapshot_path"] = snapshot_path
+
+    return _log_return(
+        _LABEL_BATCH,
+        call_params,
+        ToolResult(
+            title=f"批量画像完成 ({len(results)} 条)",
+            output=output_body,
+            long_term_memory=f"Batch portrait fetch for {len(results)} items",
+            metadata=meta,
+            include_metadata_in_llm=True,
+        ),
+    )

+ 12 - 0
examples/content_finder/utils/tool_logging.py

@@ -39,6 +39,18 @@ def format_tool_result_for_log(result: Any) -> str:
     md = getattr(result, "metadata", None)
     if isinstance(md, dict) and md:
         payload["metadata_keys"] = list(md.keys())
+        # 批量画像等:log 里仅列 keys 会导致排障困难,附加 results 摘要(仍可能较长故截断)
+        if "results" in md and isinstance(md["results"], list):
+            try:
+                rs = json.dumps(md["results"], ensure_ascii=False)
+            except (TypeError, ValueError):
+                rs = str(md["results"])
+            max_rs = 24_000
+            payload["metadata_results"] = (
+                rs if len(rs) <= max_rs else rs[:max_rs] + "\n...(truncated)"
+            )
+        if isinstance(md.get("snapshot_path"), str):
+            payload["snapshot_path"] = md["snapshot_path"]
     return json.dumps(payload, ensure_ascii=False)