| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- """
- 每日热点话题检索工具(示例)
- 调用内部爬虫服务获取“今日热榜”类榜单数据,并按业务规则筛选需要的平台来源。
- """
- import asyncio
- import json
- import logging
- import sys
- import time
- from pathlib import Path
- from typing import Any, Dict, List, Optional, TypedDict
- import requests
- def _ensure_import_paths() -> None:
- """
- 允许直接执行本文件时,也能导入仓库根目录下的 `agent`,
- 以及 content_finder 目录下的 `utils` 等模块。
- """
- content_finder_root = Path(__file__).resolve().parents[1] # .../examples/content_finder
- repo_root = Path(__file__).resolve().parents[3] # .../Agent
- for p in (repo_root, content_finder_root):
- p_str = str(p)
- if p_str not in sys.path:
- sys.path.insert(0, p_str)
- _ensure_import_paths()
- from agent.tools import ToolResult, tool
- from utils.tool_logging import format_tool_result_for_log, log_tool_call
- logger = logging.getLogger(__name__)
- _LOG_LABEL = "工具调用:hot_topic_search -> 每日热点话题检索(今日热榜)"
- HOT_TOPIC_API = "http://crawapi.piaoquantv.com/crawler/jin_ri_re_bang/content_rank"
- DEFAULT_TIMEOUT = 60.0
- MAX_MATCHED_TOPICS = 3
- class HotTopicItem(TypedDict):
- title: str
- heat: str
- class HotTopicSourceBlock(TypedDict, total=False):
- source: str
- jump_url: str
- type: str
- topics: List[HotTopicItem]
- class MatchedHotTopic(TypedDict, total=False):
- title: str
- heat: str
- source: str
- jump_url: str
- type: str
- score: int
- matched_keywords: List[str]
- def _normalize_text(text: str) -> str:
- return text.strip().lower()
- def _prepare_feature_keywords(feature_keywords: Optional[List[str]]) -> List[str]:
- if not feature_keywords:
- return []
- cleaned: List[str] = []
- for kw in feature_keywords:
- if not isinstance(kw, str):
- continue
- kw_norm = kw.strip()
- if not kw_norm:
- continue
- cleaned.append(kw_norm)
- # 保持顺序去重
- seen: set[str] = set()
- deduped: List[str] = []
- for kw in cleaned:
- key = _normalize_text(kw)
- if key in seen:
- continue
- seen.add(key)
- deduped.append(kw)
- return deduped
- def _match_title_by_words(title: str, words: List[str]) -> MatchedHotTopic:
- """
- 对输入词语列表逐一做包含匹配(规范化后:标题含该词即命中该词)。
- 无单字/模糊匹配;score 仅用于 Top 排序:命中词数优先,其次命中词总字数。
- """
- title_norm = _normalize_text(title)
- if not words:
- return MatchedHotTopic(title=title, score=0, matched_keywords=[])
- matched: List[str] = []
- for kw in words:
- kw_norm = _normalize_text(kw)
- if kw_norm and kw_norm in title_norm:
- matched.append(kw)
- if not matched:
- return MatchedHotTopic(title=title, score=0, matched_keywords=[])
- length_bonus = sum(len(k.strip()) for k in matched)
- score = 1000 * len(matched) + length_bonus
- return MatchedHotTopic(title=title, score=int(score), matched_keywords=matched)
- def _build_summary(
- *,
- blocks: List[HotTopicSourceBlock],
- has_more: bool,
- next_cursor: Any,
- feature_keywords: List[str],
- ) -> str:
- lines: List[str] = []
- total = sum(len(b.get("topics", [])) for b in blocks)
- if feature_keywords:
- lines.append(f"标题匹配特征词:{', '.join(feature_keywords)}")
- else:
- lines.append("标题匹配特征词:未提供(不过滤,返回全部话题)")
- lines.append(f"共筛出 {len(blocks)} 个来源块,话题 {total} 条")
- if has_more:
- lines.append(f"还有更多,可用 cursor={next_cursor} 继续拉取")
- lines.append("")
- for b in blocks:
- source = b.get("source") or "未知来源"
- jump_url = b.get("jump_url") or ""
- b_type = b.get("type") or ""
- topics = b.get("topics", [])
- header = f"【{source}】{b_type}".strip()
- lines.append(header)
- if jump_url:
- lines.append(f"榜单页: {jump_url}")
- for i, t in enumerate(topics[:20], 1):
- title = t.get("title", "").strip() or "无标题"
- heat = t.get("heat", "").strip() or "-"
- lines.append(f"{i}. {title}({heat})")
- if len(topics) > 20:
- lines.append(f"... 其余 {len(topics) - 20} 条已省略(完整见 metadata)")
- lines.append("")
- return "\n".join(lines).rstrip()
- def _parse_filtered_topics(raw: Dict[str, Any], *, feature_keywords: List[str]) -> Dict[str, Any]:
- data_block = raw.get("data", {}) if isinstance(raw.get("data"), dict) else {}
- has_more = bool(data_block.get("has_more", False))
- next_cursor = data_block.get("next_cursor")
- items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
- candidates: List[MatchedHotTopic] = []
- for item in items:
- if not isinstance(item, dict):
- continue
- source = (item.get("source") or "").strip()
- rank_list = item.get("rankList", []) if isinstance(item.get("rankList"), list) else []
- for r in rank_list:
- if not isinstance(r, dict):
- continue
- title = (r.get("title") or "").strip()
- heat = (r.get("heat") or "").strip()
- if not title:
- continue
- scored = _match_title_by_words(title, feature_keywords)
- score = int(scored.get("score") or 0)
- matched_kw = list(scored.get("matched_keywords") or [])
- if feature_keywords and not matched_kw:
- continue
- candidates.append(
- MatchedHotTopic(
- title=title,
- heat=heat,
- source=source,
- jump_url=item.get("jump_url") or "",
- type=item.get("type") or "",
- score=score,
- matched_keywords=matched_kw,
- )
- )
- # 全局排序取 Top3
- top_topics = sorted(candidates, key=lambda x: int(x.get("score") or 0), reverse=True)[:MAX_MATCHED_TOPICS]
- blocks_by_source: Dict[str, HotTopicSourceBlock] = {}
- topics_by_source: Dict[str, List[HotTopicItem]] = {}
- for t in top_topics:
- source = (t.get("source") or "").strip()
- if source not in blocks_by_source:
- blocks_by_source[source] = HotTopicSourceBlock(
- source=source,
- jump_url=t.get("jump_url") or "",
- type=t.get("type") or "",
- topics=[],
- )
- topic_item: HotTopicItem = {"title": t.get("title") or "", "heat": t.get("heat") or ""}
- blocks_by_source[source].setdefault("topics", []).append(topic_item)
- topics_by_source.setdefault(source, []).append(topic_item)
- blocks: List[HotTopicSourceBlock] = list(blocks_by_source.values())
- matched_total = len(top_topics)
- return {
- "has_more": has_more,
- "next_cursor": next_cursor,
- "blocks": blocks,
- "topics_by_source": topics_by_source,
- "top_topics": top_topics,
- "matched_total": matched_total,
- }
- @tool(
- description='检索“今日热榜”热点话题;feature_keywords 为词语 list,对榜单标题逐词做包含匹配(命中至少一词即保留)。不传则不过滤,返回全部话题'
- )
- async def hot_topic_search(
- sort_type: str = "最热",
- cursor: int = 1,
- feature_keywords: Optional[List[str]] = None,
- timeout: Optional[float] = None,
- ) -> ToolResult:
- """
- 检索每日热点话题(今日热榜)
- Args:
- sort_type: 榜单排序方式(如 "最热"),默认 "最热"
- cursor: 分页游标(从 1 开始),默认 1
- feature_keywords: 词语列表(list[str])。传入时对每条话题标题逐词判断规范化后的包含关系,
- 至少命中一词则保留;不传入则不做过滤(返回全部话题)。
- timeout: 超时时间(秒),默认 60
- Returns:
- ToolResult:
- - output: 人类可读摘要(为节省 token:最多返回命中的前 3 条话题)
- - metadata.has_more: 是否还有下一页
- - metadata.next_cursor: 下一页 cursor
- - metadata.blocks: 按来源块输出的结构化结果(每块 topics 仅含 title/heat)
- - metadata.topics_by_source: 按来源聚合的话题列表(仅含 title/heat)
- - metadata.top_topics: Top3 话题明细(含 score、matched_keywords)
- - metadata.matched_total: 实际返回的命中话题总数(<=3)
- - metadata.feature_keywords: 本次参与匹配的词语(清洗/去重后)
- - metadata.raw_data: 原始 API 返回
- """
- start_time = time.time()
- request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
- cleaned_keywords = _prepare_feature_keywords(feature_keywords)
- call_params: Dict[str, Any] = {
- "sort_type": sort_type,
- "cursor": cursor,
- "feature_keywords": cleaned_keywords,
- "timeout": request_timeout,
- }
- if not isinstance(sort_type, str) or not sort_type.strip():
- err = ToolResult(title="热点话题检索失败", output="", error="sort_type 参数无效:必须是非空字符串")
- log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
- return err
- if not isinstance(cursor, int) or cursor <= 0:
- err = ToolResult(title="热点话题检索失败", output="", error="cursor 参数无效:必须是正整数")
- log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
- return err
- if feature_keywords is not None and not isinstance(feature_keywords, list):
- err = ToolResult(title="热点话题检索失败", output="", error="feature_keywords 参数无效:必须是字符串列表或不传")
- log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
- return err
- try:
- payload = {"sort_type": sort_type.strip(), "cursor": cursor}
- response = requests.post(
- HOT_TOPIC_API,
- json=payload,
- headers={"Content-Type": "application/json"},
- timeout=request_timeout,
- )
- response.raise_for_status()
- raw = response.json()
- except requests.exceptions.HTTPError as e:
- logger.error(
- "hot_topic_search HTTP error",
- extra={"sort_type": sort_type, "cursor": cursor, "status_code": e.response.status_code},
- )
- err = ToolResult(title="热点话题检索失败", output="", error=f"HTTP {e.response.status_code}: {e.response.text}")
- log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
- return err
- except requests.exceptions.Timeout:
- logger.error("hot_topic_search timeout", extra={"sort_type": sort_type, "cursor": cursor, "timeout": request_timeout})
- err = ToolResult(title="热点话题检索失败", output="", error=f"请求超时({request_timeout}秒)")
- log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
- return err
- except requests.exceptions.RequestException as e:
- logger.error("hot_topic_search network error", extra={"sort_type": sort_type, "cursor": cursor, "error": str(e)})
- err = ToolResult(title="热点话题检索失败", output="", error=f"网络错误: {str(e)}")
- log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
- return err
- except Exception as e:
- logger.error(
- "hot_topic_search unexpected error",
- extra={"sort_type": sort_type, "cursor": cursor, "error": str(e)},
- exc_info=True,
- )
- err = ToolResult(title="热点话题检索失败", output="", error=f"未知错误: {str(e)}")
- log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
- return err
- parsed = _parse_filtered_topics(raw if isinstance(raw, dict) else {}, feature_keywords=cleaned_keywords)
- has_more = bool(parsed.get("has_more", False))
- next_cursor = parsed.get("next_cursor")
- blocks = parsed.get("blocks", [])
- matched_total = int(parsed.get("matched_total") or 0)
- summary = _build_summary(blocks=blocks, has_more=has_more, next_cursor=next_cursor, feature_keywords=cleaned_keywords)
- duration_ms = int((time.time() - start_time) * 1000)
- logger.info(
- "hot_topic_search completed",
- extra={
- "sort_type": sort_type,
- "cursor": cursor,
- "blocks_count": len(blocks),
- "has_more": has_more,
- "next_cursor": next_cursor,
- "duration_ms": duration_ms,
- },
- )
- out = ToolResult(
- title=f"今日热榜热点话题({sort_type},cursor={cursor})",
- output=summary,
- long_term_memory=f"Fetched hot topics sort_type='{sort_type}' cursor={cursor}",
- metadata={
- "raw_data": raw,
- "has_more": has_more,
- "next_cursor": next_cursor,
- "blocks": blocks,
- "topics_by_source": parsed.get("topics_by_source", {}),
- "top_topics": parsed.get("top_topics", []),
- "matched_total": matched_total,
- "feature_keywords": cleaned_keywords,
- },
- include_metadata_in_llm=True,
- )
- log_tool_call(_LOG_LABEL, call_params, json.dumps(out.metadata.get("topics_by_source", {}), ensure_ascii=False))
- return out
- async def main() -> None:
- result = await hot_topic_search(sort_type="最热", cursor=1)
- print(result.output)
- if __name__ == "__main__":
- asyncio.run(main())
|