| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- """
- 定时任务相关数据库操作
- demand_content: 原始检索内容库
- demand_find_task: 执行记录表,通过 demand_content_id 关联
- """
- import logging
- from typing import Any, Dict, List, Optional
- from .connection import get_connection
- logger = logging.getLogger(__name__)
- # 状态常量(与 demand_find_task 表一致)
- STATUS_PENDING = 0
- STATUS_RUNNING = 1
- STATUS_SUCCESS = 2
- STATUS_FAILED = 3
- def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
- """
- 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。
- 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。
- 已有任务则视为已跑过(含失败),不再被定时任务选中。
- Returns:
- {"demand_content_id": int, "query": str} 或 None
- """
- sql = """
- SELECT dc.id AS demand_content_id,
- dc.name AS query
- FROM demand_content dc
- WHERE NOT EXISTS (
- SELECT 1 FROM demand_find_task t
- WHERE t.demand_content_id = dc.id
- )
- ORDER BY dc.id ASC
- LIMIT 1
- """
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cur:
- cur.execute(sql)
- row = cur.fetchone()
- return dict(row) if row else None
- except Exception as e:
- logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True)
- raise
- finally:
- if conn:
- conn.close()
- def get_daily_unprocessed_pool(
- *,
- total_limit: int = 20,
- per_category_limit: int = 5,
- ) -> List[Dict[str, Any]]:
- """
- 生成“当天任务池”:
- - 先按 demand_content.merge_leve2 品类去重分组
- - 每个品类取 score 最高的 N 条(per_category_limit)
- - 全局最多取 M 条(total_limit)
- - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败)
- """
- sql = """
- SELECT x.demand_content_id, x.query, x.merge_leve2, x.score
- FROM (
- SELECT
- dc.id AS demand_content_id,
- dc.name AS query,
- dc.merge_leve2 AS merge_leve2,
- dc.score AS score,
- ROW_NUMBER() OVER (
- PARTITION BY COALESCE(dc.merge_leve2, '')
- ORDER BY dc.score DESC, dc.id DESC
- ) AS rn
- FROM demand_content dc
- WHERE NOT EXISTS (
- SELECT 1 FROM demand_find_task t
- WHERE t.demand_content_id = dc.id
- )
- ) x
- WHERE x.rn <= %s
- ORDER BY x.score DESC, x.demand_content_id DESC
- LIMIT %s
- """
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cur:
- cur.execute(sql, (per_category_limit, total_limit))
- rows = cur.fetchall() or []
- return [dict(r) for r in rows]
- except Exception as e:
- logger.error(f"get_daily_unprocessed_pool 失败: {e}", exc_info=True)
- raise
- finally:
- if conn:
- conn.close()
- def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
- """
- 在 demand_find_task 中新增一条记录。
- 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。
- 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值
- (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。
- """
- sql = """
- INSERT INTO demand_find_task (trace_id, demand_content_id, status)
- VALUES (%s, %s, %s)
- ON DUPLICATE KEY UPDATE
- trace_id = IF(status = %s, VALUES(trace_id), trace_id),
- status = IF(status = %s, VALUES(status), status)
- """
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cur:
- cur.execute(
- sql,
- (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED),
- )
- logger.info(f"创建任务记录: demand_content_id={demand_content_id}")
- except Exception as e:
- logger.error(f"create_task_record 失败: {e}", exc_info=True)
- raise
- finally:
- if conn:
- conn.close()
- def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None:
- """
- 任务完成后更新 trace_id 和 status。
- 匹配 trace_id 为空字符串的记录(初始创建时的占位)。
- """
- sql = """
- UPDATE demand_find_task
- SET trace_id = %s, status = %s
- WHERE demand_content_id = %s AND trace_id = ''
- """
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cur:
- cur.execute(sql, (trace_id, status, demand_content_id))
- logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}")
- except Exception as e:
- logger.error(f"update_task_on_complete 失败: {e}", exc_info=True)
- raise
- finally:
- if conn:
- conn.close()
- def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
- """
- 更新 demand_find_task 中指定记录的状态。
- trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。
- """
- sql = """
- UPDATE demand_find_task
- SET status = %s
- WHERE trace_id = %s AND demand_content_id = %s
- """
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cur:
- cur.execute(sql, (status, trace_id, demand_content_id))
- logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}")
- except Exception as e:
- logger.error(f"update_task_status 失败: {e}", exc_info=True)
- raise
- finally:
- if conn:
- conn.close()
|