""" 内容搜索缓存(磁盘持久化) 搜索结果按 trace_id 隔离,同一 Agent session 内的 CLI 多次调用也能复用。 文件格式:/tmp/content_cache_{trace_id}.json """ import json import os import time from pathlib import Path from typing import Any, Dict, List, Optional _CACHE_DIR = Path("/tmp") _CACHE_TTL = 3600 # 1 小时过期 def _cache_path(trace_id: str) -> Path: safe_id = trace_id.replace("/", "_").replace("..", "_") return _CACHE_DIR / f"content_cache_{safe_id}.json" def _load_raw(trace_id: str) -> dict: p = _cache_path(trace_id) if not p.exists(): return {} try: data = json.loads(p.read_text("utf-8")) # 检查过期 if time.time() - data.get("_ts", 0) > _CACHE_TTL: p.unlink(missing_ok=True) return {} return data except Exception: return {} def _save_raw(trace_id: str, data: dict) -> None: data["_ts"] = time.time() try: _cache_path(trace_id).write_text( json.dumps(data, ensure_ascii=False), encoding="utf-8" ) except Exception: pass def save_search_results( trace_id: str, platform: str, keyword: str, posts: List[Dict[str, Any]], ) -> None: """保存搜索结果到磁盘缓存""" data = _load_raw(trace_id) # 每个 platform 只保留最近一次搜索 data[f"search:{platform}"] = { "keyword": keyword, "posts": posts, } _save_raw(trace_id, data) def get_cached_post( trace_id: str, platform: str, index: int, ) -> Optional[Dict[str, Any]]: """按索引从缓存取一条完整记录(1-based)""" data = _load_raw(trace_id) entry = data.get(f"search:{platform}") if not entry: return None posts = entry.get("posts", []) if 1 <= index <= len(posts): return posts[index - 1] return None def get_cached_search_info(trace_id: str, platform: str) -> Optional[Dict[str, Any]]: """获取缓存的搜索信息(keyword + 总条数),用于错误提示""" data = _load_raw(trace_id) entry = data.get(f"search:{platform}") if not entry: return None return {"keyword": entry.get("keyword"), "total": len(entry.get("posts", []))}