瀏覽代碼

feat: 时区优化 & 增加suggestion字段用于补充搜索词猜想

jihuaqiang 1 天之前
父節點
當前提交
b916683c8a

+ 2 - 1
examples/content_finder/content_finder.md

@@ -57,7 +57,7 @@ $system$
 3. 当实质特征不为空时,必须满足:上层特征和下层特征不能同时为空,且应满足 `上层特征 ∪ 下层特征 = 实质特征`(允许同一原词在不同阶段被引用)。
 4. 命中**case出发策略**时,不管下层特征是否具体,都需要调用**高赞case工具**,不能直接发起搜索,搜索词和输出字段**必须基于`get_video_topic`工具返回的metadata.videos字段**进行原值填充,所有`下层特征`的特征词必须根据**高赞视频选题点提取**的结果进行后续步骤,不需要再和原始的特征词关联,也不允许联想或者新生成。
 5. 命中**特征出发策略**时,使用原始的特征词填充特征出发搜索词。
-6. 使用热门话题获取工具 `hot_topic_search` 对搜索词进行补充完善,但**必须传入“实质特征”特征词**,并在工具内部对热点话题做**词组匹配(包含匹配)**:只允许使用**匹配到任一特征词**的热点话题来补充搜索词;禁止仅按“特征品类/大类”进行粗略补充或联想扩展。
+6. 使用热门话题获取工具 `hot_topic_search` 对搜索词进行补充完善,但**必须传入“实质特征”特征词分词**(LIST结构,比如"打工人的一天"可拆分为["打工人","一天"]),并在工具内部对热点话题做**词语匹配(包含匹配)**:只允许使用**匹配到任一特征词**的热点话题来补充搜索词;禁止仅按“特征品类/大类”进行粗略补充或联想扩展。
 7. 此阶段必须输出下面的结构(举例)
 ```json
 {
@@ -146,6 +146,7 @@ $user$
 任务:找最多10个以「%query%」为特征的视频。
 
 特征词: %query%
+补充信息(用于特征词的完善):「%suggestion%」
 搜索词id: %demand_id%(如有)
 
 请开始执行内容寻找任务。记住要多步推理,每次只执行一小步,然后思考下一步该做什么。

+ 15 - 4
examples/content_finder/core.py

@@ -12,9 +12,13 @@ from pathlib import Path
 from typing import Optional, Dict, Any
 from utils.log_capture import attach_log_file, build_log, log
 from datetime import datetime
+from zoneinfo import ZoneInfo
 import uuid
 
 
+LOG_TZ = ZoneInfo("Asia/Shanghai")
+
+
 def _resolve_repo_root() -> Path:
     # /.../Agent/examples/content_finder/core.py -> repo root is /.../Agent
     return Path(__file__).resolve().parents[2]
@@ -51,7 +55,7 @@ def _resolve_log_file_path(
     if not p.is_absolute():
         p = (content_finder_root / p).resolve()
     log_dir = p if not p.suffix else p.parent
-    return (log_dir / f"run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt").resolve()
+    return (log_dir / f"run_log_{datetime.now(LOG_TZ).strftime('%Y%m%d_%H%M%S')}.txt").resolve()
 
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
@@ -92,7 +96,8 @@ from tools import (
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "米饭,中毒"
+DEFAULT_QUERY = "分享"
+DEFAULT_SUGGESTION = "用户希望分享贪污腐败相关的信息、案例、观点给他人"
 DEFAULT_DEMAND_ID = 1
 
 
@@ -113,6 +118,7 @@ def extract_assistant_text(message: Message) -> str:
 async def run_agent(
     query: Optional[str] = None,
     demand_id: Optional[int] = None,
+    suggestion: Optional[str] = None,
     stream_output: bool = True,
     log_assistant_text: bool = True,
 ) -> Dict[str, Any]:
@@ -122,6 +128,7 @@ async def run_agent(
     Args:
         query: 查询内容(搜索词),None 则使用默认值
         demand_id: 本次搜索任务 id(int,关联 demand_content 表)
+        suggestion: 补充信息(与 query 同源时来自 demand_content.suggestion),None 则置空串参与占位符替换
         stream_output: 是否输出到 stdout(run.py 需要,server.py 不需要)
         log_assistant_text: 是否将 assistant 文本写入 log.txt(server 建议开启)
 
@@ -134,6 +141,7 @@ async def run_agent(
     """
     query = query or DEFAULT_QUERY
     demand_id = demand_id or DEFAULT_DEMAND_ID
+    suggestion_str = (suggestion or DEFAULT_SUGGESTION).strip()
 
     # 加载 prompt
     prompt_path = Path(__file__).parent / "content_finder.md"
@@ -145,10 +153,13 @@ async def run_agent(
     output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
     output_dir_path = _resolve_dir_from_env(repo_root, output_dir)
 
-    # 构建消息(替换 %query%、%output_dir%、%demand_id%)
+    # 构建消息(替换 %query%、%suggestion%、%output_dir%、%demand_id%)
     demand_id_str = str(demand_id) if demand_id is not None else ""
     messages = prompt.build_messages(
-        query=query, output_dir=str(output_dir_path), demand_id=demand_id_str
+        query=query,
+        suggestion=suggestion_str,
+        output_dir=str(output_dir_path),
+        demand_id=demand_id_str,
     )
 
     # 初始化配置

+ 12 - 8
examples/content_finder/db/schedule.py

@@ -1,7 +1,7 @@
 """
 定时任务相关数据库操作
 
-demand_content: 原始检索内容库
+demand_content: 原始检索内容库(name→query,suggestion→补充信息)
 demand_find_task: 执行记录表,通过 demand_content_id 关联
 """
 
@@ -28,11 +28,12 @@ def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
     已有任务则视为已跑过(含失败),不再被定时任务选中。
 
     Returns:
-        {"demand_content_id": int, "query": str} 或 None
+        {"demand_content_id": int, "query": str, "suggestion": Optional[str]} 或 None
     """
     sql = """
     SELECT dc.id AS demand_content_id,
-           dc.name AS query
+           dc.name AS query,
+           dc.suggestion AS suggestion
     FROM demand_content dc
     WHERE NOT EXISTS (
         SELECT 1 FROM demand_find_task t
@@ -60,15 +61,16 @@ def get_first_running_task() -> Optional[Dict[str, Any]]:
     """
     查找 demand_find_task 中 status=STATUS_RUNNING(1) 的任务(理论上仅一条)。
 
-    用于服务重启后恢复执行中的任务:联表取出 query(demand_content.name)。
+    用于服务重启后恢复执行中的任务:联表取出 query(demand_content.name)、suggestion
 
     Returns:
-        {"demand_content_id": int, "query": str, "trace_id": str} 或 None
+        {"demand_content_id": int, "query": str, "suggestion": Optional[str], "trace_id": str} 或 None
     """
     sql = """
     SELECT t.demand_content_id,
            t.trace_id,
-           dc.name AS query
+           dc.name AS query,
+           dc.suggestion AS suggestion
     FROM demand_find_task t
     INNER JOIN demand_content dc ON dc.id = t.demand_content_id
     WHERE t.status = %s
@@ -99,11 +101,12 @@ def get_one_today_unprocessed_demand(*, dt: int) -> Optional[Dict[str, Any]]:
     - 同 dt 下按 score 降序取第一条(最高分优先)
 
     Returns:
-        {"demand_content_id": int, "query": str, "score": Any} 或 None
+        {"demand_content_id": int, "query": str, "suggestion": Optional[str], "score": Any} 或 None
     """
     sql = """
     SELECT dc.id AS demand_content_id,
            dc.name AS query,
+           dc.suggestion AS suggestion,
            dc.score AS score
     FROM demand_content dc
     WHERE dc.dt = %s
@@ -142,11 +145,12 @@ def get_daily_unprocessed_pool(
     - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败)
     """
     sql = """
-    SELECT x.demand_content_id, x.query, x.merge_leve2, x.score
+    SELECT x.demand_content_id, x.query, x.suggestion, x.merge_leve2, x.score
     FROM (
         SELECT
             dc.id AS demand_content_id,
             dc.name AS query,
+            dc.suggestion AS suggestion,
             dc.merge_leve2 AS merge_leve2,
             dc.score AS score,
             ROW_NUMBER() OVER (

+ 38 - 12
examples/content_finder/server.py

@@ -61,7 +61,8 @@ app = FastAPI(
 
 # 定时调度器(默认用中国时区,避免容器 UTC 导致错过预期时间点)
 SCHEDULER_TIMEZONE = os.getenv("SCHEDULER_TIMEZONE", os.getenv("TZ", "Asia/Shanghai"))
-scheduler = AsyncIOScheduler(timezone=ZoneInfo(SCHEDULER_TIMEZONE))
+SCHEDULER_TZ = ZoneInfo(SCHEDULER_TIMEZONE)
+scheduler = AsyncIOScheduler(timezone=SCHEDULER_TZ)
 
 # 并发控制
 MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "1"))
@@ -85,6 +86,7 @@ stats = {
 class TaskRequest(BaseModel):
     query: Optional[str] = None
     demand_id: Optional[int] = None
+    suggestion: Optional[str] = None
 
 
 class TaskResponse(BaseModel):
@@ -107,6 +109,7 @@ def _update_scheduled_task_complete(demand_id: int, trace_id: str, status: int)
 async def execute_task(
     query: str,
     demand_id: Optional[int] = None,
+    suggestion: str = "",
     task_type: str = "api",
 ):
     """
@@ -115,13 +118,14 @@ async def execute_task(
     Args:
         query: 查询内容
         demand_id: 需求 id(demand_content.id,关联 demand_content 表)
+        suggestion: 补充信息(定时任务与 demand_content.suggestion 一致)
         task_type: 任务类型("api" 或 "scheduled")
     """
     async with task_semaphore:
         current_concurrent = MAX_CONCURRENT_TASKS - task_semaphore._value + 1
         logger.info(f"任务开始 [{task_type}]: query={query[:50]}..., 当前并发={current_concurrent}/{MAX_CONCURRENT_TASKS}")
 
-        start_time = datetime.now()
+        start_time = datetime.now(SCHEDULER_TZ)
         stats["total_tasks"] += 1
         if task_type == "scheduled":
             stats["scheduled_tasks"] += 1
@@ -135,11 +139,15 @@ async def execute_task(
         try:
             result = await asyncio.wait_for(
                 core.run_agent(
-                    query, demand_id=demand_id, stream_output=False, log_assistant_text=True
+                    query,
+                    demand_id=demand_id,
+                    suggestion=suggestion or None,
+                    stream_output=False,
+                    log_assistant_text=True,
                 ),
                 timeout=float(TASK_TIMEOUT_SECONDS),
             )
-            duration = (datetime.now() - start_time).total_seconds()
+            duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
 
             if result["status"] == "completed":
                 stats["completed_tasks"] += 1
@@ -154,7 +162,7 @@ async def execute_task(
 
         except asyncio.TimeoutError:
             stats["failed_tasks"] += 1
-            duration = (datetime.now() - start_time).total_seconds()
+            duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
             logger.error(
                 f"任务超时 [{task_type}]: 超过 {TASK_TIMEOUT_SECONDS}s,记为失败, 耗时={duration:.1f}s"
             )
@@ -163,7 +171,7 @@ async def execute_task(
 
         except Exception as e:
             stats["failed_tasks"] += 1
-            duration = (datetime.now() - start_time).total_seconds()
+            duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
             logger.error(f"任务异常 [{task_type}]: {e}, 耗时={duration:.1f}s", exc_info=True)
             if task_type == "scheduled" and demand_id is not None:
                 _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
@@ -171,7 +179,7 @@ async def execute_task(
 
 def _today_dt_int() -> int:
     """当天 demand_content.dt 约定为 YYYYMMDD 整数(如 20260402),与定时器时区一致。"""
-    return int(datetime.now(ZoneInfo(SCHEDULER_TIMEZONE)).strftime("%Y%m%d"))
+    return int(datetime.now(SCHEDULER_TZ).strftime("%Y%m%d"))
 
 
 def _has_running_content_task() -> bool:
@@ -202,6 +210,7 @@ async def scheduled_tick():
 
     demand_content_id = item.get("demand_content_id")
     query = (item.get("query") or "").strip()
+    suggestion = (item.get("suggestion") or "").strip()
     if demand_content_id is None or not query:
         logger.info("定时任务跳过:查询结果无效")
         return
@@ -212,7 +221,12 @@ async def scheduled_tick():
         f"dt={dt}, score={score}"
     )
     create_task_record(demand_content_id)
-    await execute_task(query=query, demand_id=demand_content_id, task_type="scheduled")
+    await execute_task(
+        query=query,
+        demand_id=demand_content_id,
+        suggestion=suggestion,
+        task_type="scheduled",
+    )
 
 
 async def run_startup_resume():
@@ -227,12 +241,18 @@ async def run_startup_resume():
 
         demand_content_id = row.get("demand_content_id")
         query = (row.get("query") or "").strip()
+        suggestion = (row.get("suggestion") or "").strip()
         if demand_content_id is None or not query:
             logger.warning("启动恢复:执行中任务数据不完整,跳过")
             return
 
         logger.info(f"启动恢复:执行 demand_find_task status=1, demand_content_id={demand_content_id}")
-        await execute_task(query=query, demand_id=int(demand_content_id), task_type="scheduled")
+        await execute_task(
+            query=query,
+            demand_id=int(demand_content_id),
+            suggestion=suggestion,
+            task_type="scheduled",
+        )
     except Exception as e:
         logger.error(f"启动恢复失败: {e}", exc_info=True)
 
@@ -255,9 +275,10 @@ async def create_task(request: TaskRequest):
             "message": "任务已启动,结果将保存到 .cache/traces/xxx/"
         }
     """
-    # 获取 query 和 demand_id
+    # 获取 query、demand_id、suggestion(API 显式传入;与库表字段同名便于对齐)
     query = request.query or core.DEFAULT_QUERY
     demand_id = request.demand_id
+    suggestion_str = (request.suggestion or "").strip()
 
     # 用 Event 等待 trace_id
     trace_id_ready = asyncio.Event()
@@ -279,7 +300,12 @@ async def create_task(request: TaskRequest):
                 prompt = SimplePrompt(prompt_path)
                 trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
                 demand_id_str = str(demand_id) if demand_id is not None else ""
-                messages = prompt.build_messages(query=query, trace_dir=trace_dir, demand_id=demand_id_str)
+                messages = prompt.build_messages(
+                    query=query,
+                    suggestion=suggestion_str,
+                    trace_dir=trace_dir,
+                    demand_id=demand_id_str,
+                )
 
                 api_key = os.getenv("OPEN_ROUTER_API_KEY")
                 model_name = prompt.config.get("model", "sonnet-4.6")
@@ -348,7 +374,7 @@ async def create_task(request: TaskRequest):
             stats["failed_tasks"] += 1
             logger.error(f"任务异常 [api]: {e}", exc_info=True)
             if not trace_id_holder["id"]:
-                trace_id_holder["id"] = f"error_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+                trace_id_holder["id"] = f"error_{datetime.now(SCHEDULER_TZ).strftime('%Y%m%d_%H%M%S')}"
                 trace_id_ready.set()
 
     # 启动后台任务

+ 5 - 2
examples/content_finder/tools/aigc_platform_api.py

@@ -10,6 +10,7 @@ from pathlib import Path
 from typing import List, Dict, Union, Tuple, Any
 
 import requests
+from zoneinfo import ZoneInfo
 
 from agent import ToolResult, tool
 from db import update_content_plan_ids
@@ -22,6 +23,8 @@ _LABEL_CONTENT = "工具调用:create_crawler_plan_by_douyin_content_id -> 按
 
 AIGC_DEMAND_DOUYIN_CONTENT_PUBLISH_PLAN_ID=20260320065232171836746
 
+SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
+
 
 def _log_aigc_return(label: str, params: Dict[str, Any], r: ToolResult) -> ToolResult:
     log_tool_call(label, params, format_tool_result_for_log(r))
@@ -150,7 +153,7 @@ async def create_crawler_plan_by_douyin_account_id(
         produce_plan_ids = []
     call_params["produce_plan_ids"] = produce_plan_ids
 
-    dt = datetime.now().strftime("%Y%m%d%h%M%s")
+    dt = datetime.now(SHANGHAI_TZ).strftime("%Y%m%d%H%M%S")
     crawler_plan_name = f"【内容寻找Agent自动创建】{dt}_抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}"
     params = {
         "accountFilters": [],
@@ -366,7 +369,7 @@ async def create_crawler_plan_by_douyin_content_id(
         )
 
     produce_plan_ids = _get_produce_plan_ids_from_env()
-    dt = datetime.now().strftime("%Y%m%d%h%M%s")
+    dt = datetime.now(SHANGHAI_TZ).strftime("%Y%m%d%H%M%S")
     crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
     params = {
         "channel": 2,

+ 24 - 61
examples/content_finder/tools/hot_topic_search.py

@@ -60,9 +60,7 @@ class MatchedHotTopic(TypedDict, total=False):
     jump_url: str
     type: str
     score: int
-    match_mode: str  # "phrase" | "char" | "none"
     matched_keywords: List[str]
-    matched_chars: List[str]
 
 
 def _normalize_text(text: str) -> str:
@@ -92,61 +90,28 @@ def _prepare_feature_keywords(feature_keywords: Optional[List[str]]) -> List[str
     return deduped
 
 
-def _extract_chars_from_keywords(feature_keywords: List[str]) -> List[str]:
-    chars: List[str] = []
-    seen: set[str] = set()
-    for kw in feature_keywords:
-        for ch in kw.strip():
-            if ch.isspace():
-                continue
-            if ch in seen:
-                continue
-            seen.add(ch)
-            chars.append(ch)
-    return chars
-
-
-def _score_title_match(title: str, feature_keywords: List[str]) -> MatchedHotTopic:
+def _match_title_by_words(title: str, words: List[str]) -> MatchedHotTopic:
     """
-    匹配策略(业务规则):
-    - 优先词组(feature_keywords)包含匹配;只要命中任一词组,即进入 phrase 模式
-    - 若一个词组都没命中,再进行“单字/字符”匹配,按命中字符数计分
-    - 返回 score 与命中依据,供后续 prompt 再做相关性判断
+    对输入词语列表逐一做包含匹配(规范化后:标题含该词即命中该词)。
+
+    无单字/模糊匹配;score 仅用于 Top 排序:命中词数优先,其次命中词总字数。
     """
     title_norm = _normalize_text(title)
-    if not feature_keywords:
-        return MatchedHotTopic(title=title, score=0, match_mode="none", matched_keywords=[], matched_chars=[])
+    if not words:
+        return MatchedHotTopic(title=title, score=0, matched_keywords=[])
 
-    matched_keywords: List[str] = []
-    for kw in feature_keywords:
+    matched: List[str] = []
+    for kw in words:
         kw_norm = _normalize_text(kw)
         if kw_norm and kw_norm in title_norm:
-            matched_keywords.append(kw)
-
-    if matched_keywords:
-        # phrase 模式:命中词组数优先,其次命中词组总长度作为细粒度排序
-        length_bonus = sum(len(k.strip()) for k in matched_keywords)
-        score = 1000 * len(matched_keywords) + length_bonus
-        return MatchedHotTopic(
-            title=title,
-            score=int(score),
-            match_mode="phrase",
-            matched_keywords=matched_keywords,
-            matched_chars=[],
-        )
+            matched.append(kw)
 
-    # char 模式:仅在“无任何词组命中”时启用
-    keyword_chars = _extract_chars_from_keywords(feature_keywords)
-    title_chars = set(title.strip())
-    matched_chars = [ch for ch in keyword_chars if ch in title_chars]
-    score = len(matched_chars)
-    return MatchedHotTopic(
-        title=title,
-        score=int(score),
-        match_mode="char" if score > 0 else "none",
-        matched_keywords=[],
-        matched_chars=matched_chars,
-    )
+    if not matched:
+        return MatchedHotTopic(title=title, score=0, matched_keywords=[])
+
+    length_bonus = sum(len(k.strip()) for k in matched)
+    score = 1000 * len(matched) + length_bonus
+    return MatchedHotTopic(title=title, score=int(score), matched_keywords=matched)
 
 
 def _build_summary(
@@ -205,11 +170,10 @@ def _parse_filtered_topics(raw: Dict[str, Any], *, feature_keywords: List[str])
             heat = (r.get("heat") or "").strip()
             if not title:
                 continue
-            scored = _score_title_match(title, feature_keywords)
+            scored = _match_title_by_words(title, feature_keywords)
             score = int(scored.get("score") or 0)
-            match_mode = str(scored.get("match_mode") or "none")
-            # 要求“词组优先;无词组再按单字”,所以仅保留有得分/有命中的候选
-            if feature_keywords and match_mode == "none":
+            matched_kw = list(scored.get("matched_keywords") or [])
+            if feature_keywords and not matched_kw:
                 continue
             candidates.append(
                 MatchedHotTopic(
@@ -219,9 +183,7 @@ def _parse_filtered_topics(raw: Dict[str, Any], *, feature_keywords: List[str])
                     jump_url=item.get("jump_url") or "",
                     type=item.get("type") or "",
                     score=score,
-                    match_mode=match_mode,
-                    matched_keywords=list(scored.get("matched_keywords") or []),
-                    matched_chars=list(scored.get("matched_chars") or []),
+                    matched_keywords=matched_kw,
                 )
             )
 
@@ -257,7 +219,7 @@ def _parse_filtered_topics(raw: Dict[str, Any], *, feature_keywords: List[str])
 
 
 @tool(
-    description='检索“今日热榜”热点话题;可传入 feature_keywords 做标题包含匹配,仅保留命中话题(title/heat)。若不传则不做过滤,返回全部话题'
+    description='检索“今日热榜”热点话题;feature_keywords 为词语 list,对榜单标题逐词做包含匹配(命中至少一词即保留)。不传则不过滤,返回全部话题'
 )
 async def hot_topic_search(
     sort_type: str = "最热",
@@ -271,7 +233,8 @@ async def hot_topic_search(
     Args:
         sort_type: 榜单排序方式(如 "最热"),默认 "最热"
         cursor: 分页游标(从 1 开始),默认 1
-        feature_keywords: 实质特征词列表。若传入,则仅保留“标题包含任一特征词”的话题用于补充搜索词;不传入则不做标题匹配过滤(返回全部话题)。
+        feature_keywords: 词语列表(list[str])。传入时对每条话题标题逐词判断规范化后的包含关系,
+            至少命中一词则保留;不传入则不做过滤(返回全部话题)。
         timeout: 超时时间(秒),默认 60
 
     Returns:
@@ -281,9 +244,9 @@ async def hot_topic_search(
             - metadata.next_cursor: 下一页 cursor
             - metadata.blocks: 按来源块输出的结构化结果(每块 topics 仅含 title/heat)
             - metadata.topics_by_source: 按来源聚合的话题列表(仅含 title/heat)
-            - metadata.top_topics: Top3 话题明细(含 score/match_mode/命中依据),用于 prompt 再做相关性判断
+            - metadata.top_topics: Top3 话题明细(含 score、matched_keywords)
             - metadata.matched_total: 实际返回的命中话题总数(<=3)
-            - metadata.feature_keywords: 本次用于标题匹配的特征词(清洗/去重后)
+            - metadata.feature_keywords: 本次参与匹配的词语(清洗/去重后)
             - metadata.raw_data: 原始 API 返回
     """
     start_time = time.time()