Sfoglia il codice sorgente

定时任务调整

jihuaqiang 7 ore fa
parent
commit
add47975d8

+ 49 - 16
examples/content_finder/core.py

@@ -15,13 +15,43 @@ from datetime import datetime
 import uuid
 
 
-def _resolve_input_log_dir(content_finder_root: Path) -> Path:
-    """与 .env 中 INPUT_LOG_PATH 一致:目录;相对路径相对 content_finder 根目录。"""
-    raw = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
+def _resolve_repo_root() -> Path:
+    # /.../Agent/examples/content_finder/core.py -> repo root is /.../Agent
+    return Path(__file__).resolve().parents[2]
+
+
+def _resolve_dir_from_env(repo_root: Path, raw: str) -> Path:
     p = Path(raw).expanduser()
-    if p.is_absolute():
-        return p if not p.suffix else p.parent
-    return (content_finder_root / p).resolve()
+    return p.resolve() if p.is_absolute() else (repo_root / p).resolve()
+
+
+def _resolve_log_file_path(
+    *,
+    content_finder_root: Path,
+    output_dir_path: Path,
+    trace_id: str | None,
+    execution_id: str,
+) -> Path:
+    """
+    解析日志输出路径。
+
+    规则:
+    - 如果设置了 INPUT_LOG_PATH:
+      - 值为 OUTPUT_DIR / ${OUTPUT_DIR}:写入 OUTPUT_DIR/<trace_id>/log.txt
+      - 绝对/相对路径:视为“目录”,写入 <dir>/run_log_<timestamp>.txt(兼容旧行为)
+    - 未设置 INPUT_LOG_PATH:默认写入 OUTPUT_DIR/<trace_id>/log.txt
+    """
+    raw = (os.getenv("INPUT_LOG_PATH") or "").strip()
+    dir_name = trace_id or execution_id
+
+    if raw in {"OUTPUT_DIR", "${OUTPUT_DIR}"} or raw == "":
+        return (output_dir_path / dir_name / "log.txt").resolve()
+
+    p = Path(raw).expanduser()
+    if not p.is_absolute():
+        p = (content_finder_root / p).resolve()
+    log_dir = p if not p.suffix else p.parent
+    return (log_dir / f"run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt").resolve()
 
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
@@ -60,7 +90,7 @@ from tools import (
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "毛泽东,反腐倡廉"
+DEFAULT_QUERY = "毛泽东"
 DEFAULT_DEMAND_ID = 1
 
 
@@ -107,10 +137,9 @@ async def run_agent(
 
     # output 目录(相对路径相对 content_finder)
     content_finder_root = Path(__file__).resolve().parent
+    repo_root = _resolve_repo_root()
     output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
-    output_dir_path = Path(output_dir).expanduser()
-    if not output_dir_path.is_absolute():
-        output_dir_path = (content_finder_root / output_dir_path).resolve()
+    output_dir_path = _resolve_dir_from_env(repo_root, output_dir)
 
     # 构建消息(替换 %query%、%output_dir%、%demand_id%)
     demand_id_str = str(demand_id) if demand_id is not None else ""
@@ -131,9 +160,10 @@ async def run_agent(
     
     skills_dir = str(Path(__file__).parent / "skills")
 
-    Path(trace_dir).mkdir(parents=True, exist_ok=True)
+    trace_dir_path = _resolve_dir_from_env(repo_root, trace_dir)
+    trace_dir_path.mkdir(parents=True, exist_ok=True)
 
-    store = FileSystemTraceStore(base_path=trace_dir)
+    store = FileSystemTraceStore(base_path=str(trace_dir_path))
 
     allowed_tools = [
         "douyin_search",
@@ -182,10 +212,6 @@ async def run_agent(
     execution_id = str(uuid.uuid4())
 
     try:
-        log_dir = _resolve_input_log_dir(content_finder_root)
-        log_dir.mkdir(parents=True, exist_ok=True)
-        log_file_path = log_dir / f"run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
-
         run_result: Optional[Dict[str, Any]] = None
 
         with build_log(execution_id) as log_buffer:
@@ -222,6 +248,13 @@ async def run_agent(
                 }
 
             full_log = log_buffer.getvalue()
+            log_file_path = _resolve_log_file_path(
+                content_finder_root=content_finder_root,
+                output_dir_path=output_dir_path,
+                trace_id=trace_id,
+                execution_id=execution_id,
+            )
+            log_file_path.parent.mkdir(parents=True, exist_ok=True)
             with open(log_file_path, "w", encoding="utf-8") as f:
                 f.write(full_log)
 

+ 2 - 0
examples/content_finder/db/__init__.py

@@ -10,6 +10,7 @@ from .connection import get_connection
 from .open_aigc_pattern_connection import get_open_aigc_pattern_connection
 from .schedule import (
     get_next_unprocessed_demand,
+    get_daily_unprocessed_pool,
     create_task_record,
     update_task_status,
     update_task_on_complete,
@@ -20,6 +21,7 @@ __all__ = [
     "get_connection",
     "get_open_aigc_pattern_connection",
     "get_next_unprocessed_demand",
+    "get_daily_unprocessed_pool",
     "create_task_record",
     "update_task_status",
     "update_task_on_complete",

+ 50 - 1
examples/content_finder/db/schedule.py

@@ -6,7 +6,7 @@ demand_find_task: 执行记录表,通过 demand_content_id 关联
 """
 
 import logging
-from typing import Any, Dict, Optional
+from typing import Any, Dict, List, Optional
 
 from .connection import get_connection
 
@@ -55,6 +55,55 @@ def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
             conn.close()
 
 
+def get_daily_unprocessed_pool(
+    *,
+    total_limit: int = 20,
+    per_category_limit: int = 5,
+) -> List[Dict[str, Any]]:
+    """
+    生成“当天任务池”:
+    - 先按 demand_content.merge_leve2 品类去重分组
+    - 每个品类取 score 最高的 N 条(per_category_limit)
+    - 全局最多取 M 条(total_limit)
+    - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败)
+    """
+    sql = """
+    SELECT x.demand_content_id, x.query, x.merge_leve2, x.score
+    FROM (
+        SELECT
+            dc.id AS demand_content_id,
+            dc.name AS query,
+            dc.merge_leve2 AS merge_leve2,
+            dc.score AS score,
+            ROW_NUMBER() OVER (
+                PARTITION BY COALESCE(dc.merge_leve2, '')
+                ORDER BY dc.score DESC, dc.id DESC
+            ) AS rn
+        FROM demand_content dc
+        WHERE NOT EXISTS (
+            SELECT 1 FROM demand_find_task t
+            WHERE t.demand_content_id = dc.id
+        )
+    ) x
+    WHERE x.rn <= %s
+    ORDER BY x.score DESC, x.demand_content_id DESC
+    LIMIT %s
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (per_category_limit, total_limit))
+            rows = cur.fetchall() or []
+            return [dict(r) for r in rows]
+    except Exception as e:
+        logger.error(f"get_daily_unprocessed_pool 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()
+
+
 def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
     """
     在 demand_find_task 中新增一条记录。

+ 15 - 1
examples/content_finder/db/store_results.py

@@ -1,11 +1,25 @@
 """
 推荐结果写入(demand_find_author、demand_find_content_result 表)
 """
+import json
 from typing import Any, Dict, List, Optional
 
 from .connection import get_connection
 
 
+def _normalize_content_tags(value: Any) -> str:
+    if value is None:
+        return ""
+    if isinstance(value, str):
+        return value
+    if isinstance(value, (list, tuple, set)):
+        parts = [str(x).strip() for x in value if str(x).strip()]
+        return ",".join(parts)
+    if isinstance(value, dict):
+        return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
+    return str(value)
+
+
 def upsert_good_authors(
     conn,
     trace_id: str,
@@ -58,7 +72,7 @@ def upsert_good_authors(
             elderly_ratio = acc.get("age_50_plus_ratio") or ""
             elderly_tgi = acc.get("age_50_plus_tgi") or ""
             remark = acc.get("reason") or acc.get("remark") or ""
-            content_tags = acc.get("content_tags") or ""
+            content_tags = _normalize_content_tags(acc.get("content_tags"))
             cur.execute(
                 sql,
                 (

+ 27 - 19
examples/content_finder/server.py

@@ -26,7 +26,7 @@ from dotenv import load_dotenv
 load_dotenv()
 
 import core
-from db import get_next_unprocessed_demand, create_task_record, update_task_status, update_task_on_complete
+from db import get_daily_unprocessed_pool, create_task_record, update_task_status, update_task_on_complete
 from db.schedule import STATUS_RUNNING, STATUS_SUCCESS, STATUS_FAILED
 
 # 配置日志
@@ -145,32 +145,40 @@ async def execute_task(
 
 async def scheduled_task():
     """
-    定时任务:每 10 分钟执行一次
+    定时任务:每天上午 6 点执行一次(生成当日池子并跑完)
 
     流程:
-    1. 联表查询 demand_content + demand_find_task,获取创建时间最早的未处理的 demand_content
-    2. 在 demand_find_task 新增记录
-    3. 调用 execute_task 执行
+    1. 从 demand_content 中按 merge_leve2 品类去重分组,每个品类取 score 最高且未处理过的 5 条
+    2. 全局最多取 20 条,作为当天“池子”
+    3. 为池子中每条 demand_content 创建 demand_find_task 记录,并执行任务(并发受 MAX_CONCURRENT_TASKS 限制)
     """
     logger.info("定时任务触发")
 
-    demand = get_next_unprocessed_demand()
-    if not demand:
-        logger.info("定时任务跳过:无待处理需求")
+    pool = get_daily_unprocessed_pool(total_limit=20, per_category_limit=5)
+    if not pool:
+        logger.info("定时任务跳过:无待处理需求(当日池子为空)")
         return
 
-    query = demand.get("query") or ""
-    if not query:
-        logger.info("定时任务跳过:该需求的 query 为空")
-        return
+    logger.info(f"当日任务池生成:count={len(pool)}(每日上限 20,每品类上限 5)")
+
+    tasks: list[asyncio.Task] = []
+    for item in pool:
+        query = (item.get("query") or "").strip()
+        demand_content_id = item.get("demand_content_id")
+        if not query or demand_content_id is None:
+            continue
+        create_task_record(demand_content_id)  # trace_id 初始为空,完成后更新
+        tasks.append(
+            asyncio.create_task(
+                execute_task(query=query, demand_id=demand_content_id, task_type="scheduled")
+            )
+        )
 
-    demand_content_id = demand.get("demand_content_id")
-    if demand_content_id is None:
-        logger.warning("定时任务跳过:demand_content_id 为空")
+    if not tasks:
+        logger.info("定时任务跳过:当日池子中无有效 query")
         return
 
-    create_task_record(demand_content_id)  # trace_id 初始为空,完成后更新
-    asyncio.create_task(execute_task(query=query, demand_id=demand_content_id, task_type="scheduled"))
+    await asyncio.gather(*tasks, return_exceptions=True)
 
 
 # ============ API 接口 ============
@@ -340,8 +348,8 @@ async def startup():
     logger.info("内容寻找服务启动中...")
     logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
 
-    # 配置定时任务(从 demand_content 联表查询未处理需求,无需外部 API)
-    scheduler.add_job(scheduled_task, "cron", minute="*/10")
+    # 配置定时任务:每天上午 6 点触发一次
+    scheduler.add_job(scheduled_task, "cron", hour="22", minute="10")
     scheduler.start()
     # asyncio.create_task(scheduled_task())
     # logger.info("定时任务已启动:启动后立即执行一次,之后每 10 分钟执行(从数据库获取待处理需求)")

+ 28 - 52
examples/content_finder/skills/content_filtering_strategy.md

@@ -5,76 +5,56 @@ description: 内容筛选方法论
 
 # 内容筛选方法论
 
-## 核心流程:基础筛选 → 画像匹配 → 账号扩展 → 去重排序
+## 核心流程:基础筛选 -> 基于高赞case筛选 → 画像匹配 → 账号扩展 → 去重排序
 
 ---
 
-## 阶段零:需求理解结果驱动的需求对齐打分(先于画像)
+## 阶段零:基础筛选
 
-在进入“阶段一:基础质量筛选”前,使用 `demand_analysis` 的输出做一次需求对齐打分,目的在于:
-- 先把“明显不满足目的点/灵感点”的内容尽早淘汰,减少无效画像调用
-- 在每条内容的 `reason` 中给出“需求对齐的依据”(来自标题/描述可读信息 + goodcase 选题点)
-
-### 需要使用的 demand_analysis 信息
+在获取画像前先快速过滤,减少不必要的 API 调用。
 
-- `entry_strategy.goodcase_goal_points`:用于判定“是否完成了需求的目的/解决了什么问题”
-- `entry_strategy.goodcase_inspiration_points`:用于判定“是否覆盖了可搜索的灵感点/核心表达方向”
-- `entry_strategy.goodcase_key_points`:用于判定“是否包含关键锚点要素(至少满足多数)”
-- `filter_plan.form_rules`:用于判定“形式表达是否与目标一致(结构/语气/可分享表达)”
+**热度参考标准**:
 
-> 若 `demand_analysis` 输出为空,或无法从标题/描述落到上述要素,请承认不确定性,不要编造匹配结论。
+| 点赞量 | 热度等级 |
+|---|---|
+| 1000+ | 一般热度 |
+| 5000+ | 较高热度 |
+| 10000+ | 高热度 |
+| 50000+ | 爆款 |
 
-### 可用信息范围
+评估维度:digg_count(点赞)、comment_count(评论)、share_count(分享)
 
-在未获取画像前,仅允许用以下字段做需求对齐判断:
-- `title`(若有)
-- `desc`(来自 `douyin_search` / `douyin_search_tikhub` 的搜索结果)
-- 或候选对象里可见的简介/摘要文本(若检索来源不同,请只用已有字段)
+---
 
-### 需求对齐判定规则(可直接执行)
+## 阶段一:**基于高赞case筛选**
 
-对每条候选内容,按以下规则给出结论(不需要输出数值):
+使用 `demand_analysis` 的结果里的**筛选方案**做高赞case筛选,目的在于:
+- 先把“明显不满足筛选方案”的内容尽早淘汰,减少无效画像调用
+- 在每条内容的 `reason` 中给出“需求对齐的依据”(来自标题/描述可读信息 + goodcase 选题点)
 
-1. 目的点对齐(Goal Alignment,必须项)
+### 需要使用`demand_analysis` 的结果里的**筛选方案**的字段对每条内容进行以下评估
+> 若 `demand_analysis` 输出的筛选方案为空,请承认不确定性,不要编造匹配结论。
+1. 目的点对齐(必须项)
+   - 按照**筛选方案.目的点对齐规则**执行
    - 命中:标题/描述里能看出“在做什么、解决什么、给了什么收益/动作”
    - 不命中:直接淘汰
-2. 灵感点对齐(Inspiration Alignment,必须项)
-   - 命中:标题/描述里体现了 goodcase 的“可搜索灵感点”核心表达方向(允许同义/上下位)
-   - 不命中:直接淘汰
-3. 关键点命中(Key Point Anchors,加分项)
+2. 灵感点一致(加分项)
+   - 内容的标题/描述 和 灵感点/搜索词 进行匹配度打分,1分不匹配,5分最匹配.
+   - 分数也影响最终的排序
+2. 关键点命中(加分项)
+   - 按照**筛选方案.关键点打分说明**执行
    - 命中:进入后续阶段时给更高排序倾向
    - 部分/缺失:也可进入,但排序会更低(在 `reason` 说明缺少哪些关键锚点要素或不确定点)
-4. 形式规则一致(Form-Rule Fit,低权重加分/可选)
-   - 命中:加分或作为排序 tie-breaker
+3. 形式规则一致(低权重加分/可选)
+   - 按照**特征归类.形式特征**进行匹配
+   - 命中:加分或作为排序
    - 不命中:不直接淘汰,但在 `reason` 中标注“不匹配/不确定”
 
-### 需求对齐淘汰/保留阈值
-
-- 若目的点对齐“不命中”,直接淘汰
-- 若灵感点对齐“不命中”,直接淘汰
-- 若关键点“部分/缺失”,允许进入后续阶段,但排序更低(并在 `reason` 写清缺失/不确定点)
-- 若形式规则“不命中/不确定”,允许进入后续阶段,但作为低权重扣分或 tie-breaker
-
 ### 在输出 reason 中必须包含的要素
 
 对于进入后续画像阶段的候选,在其 `reason` 中至少写明:
 至少包含四项:命中的 `目的点` 状态;命中的 `灵感点` 状态;`关键点`(命中/部分/缺失)与缺失说明或不确定点;形式规则是命中还是不确定(如无法从标题/描述判断)
 
-## 阶段一:基础质量筛选
-
-在获取画像前先快速过滤,减少不必要的 API 调用。
-
-**热度参考标准**:
-
-| 点赞量 | 热度等级 |
-|---|---|
-| 1000+ | 一般热度 |
-| 5000+ | 较高热度 |
-| 10000+ | 高热度 |
-| 50000+ | 爆款 |
-
-评估维度:digg_count(点赞)、comment_count(评论)、share_count(分享)
-
 ---
 
 ## 阶段二:画像匹配筛选
@@ -104,10 +84,6 @@ description: 内容筛选方法论
 - **偏好度(tgi)**:> 100 高于平均,= 100 平均,< 100 低于平均
 - 示例:"适合50岁以上老年人" → 年龄分布"50岁以上"占比 > 40% 且 tgi > 100 视为符合
 
----
-
-
-
 ---
 
 ## 阶段三:去重与排序

+ 2 - 2
examples/content_finder/tools/aigc_platform_api.py

@@ -25,7 +25,7 @@ def _log_aigc_return(label: str, params: Dict[str, Any], r: ToolResult) -> ToolR
     log_tool_call(label, params, format_tool_result_for_log(r))
     return r
 
-USE_REAL_API = False
+CAN_NOT_CREATE_PLAN = False
 
 AIGC_BASE_URL = "https://aigc-api.aiddit.com"
 CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"
@@ -270,7 +270,7 @@ async def create_crawler_plan_by_douyin_content_id(
     """
     call_params: Dict[str, Any] = {"trace_id": trace_id}
     # 先临时返回创建成功,不要真实创建
-    if USE_REAL_API == False:
+    if CAN_NOT_CREATE_PLAN == True:
         return _log_aigc_return(
             _LABEL_CONTENT,
             call_params,

+ 15 - 1
examples/content_finder/utils/tool_logging.py

@@ -8,6 +8,20 @@ from typing import Any, Dict
 from .log_capture import log, log_fold
 
 
+def _pretty_json_if_possible(text: str) -> str:
+    """如果文本是合法 JSON,则返回带缩进的可读格式;否则原样返回。"""
+    raw = (text or "").strip()
+    if not raw:
+        return text
+    if not (raw.startswith("{") or raw.startswith("[")):
+        return text
+    try:
+        parsed = json.loads(raw)
+    except Exception:
+        return text
+    return json.dumps(parsed, ensure_ascii=False, indent=2)
+
+
 def format_tool_result_for_log(result: Any) -> str:
     """将 ToolResult 或普通字符串格式化为可写入日志的文本(避免过长 metadata 刷屏)。"""
     if result is None:
@@ -34,5 +48,5 @@ def log_tool_call(tool_name: str, params: Dict[str, Any], result: str) -> None:
         with log_fold("📥 调用参数"):
             log(json.dumps(params, ensure_ascii=False, indent=2))
         with log_fold("📤 返回内容"):
-            log(result)
+            log(_pretty_json_if_possible(result))