""" 定时任务相关数据库操作 demand_content: 原始检索内容库 demand_find_task: 执行记录表,通过 demand_content_id 关联 """ import logging from datetime import datetime from typing import Any, Dict, List, Optional from .connection import get_connection logger = logging.getLogger(__name__) # 状态常量(与 demand_find_task 表一致) STATUS_PENDING = 0 STATUS_RUNNING = 1 STATUS_SUCCESS = 2 STATUS_FAILED = 3 def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]: """ 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。 已有任务则视为已跑过(含失败),不再被定时任务选中。 Returns: {"demand_content_id": int, "query": str} 或 None """ sql = """ SELECT dc.id AS demand_content_id, dc.name AS query FROM demand_content dc WHERE NOT EXISTS ( SELECT 1 FROM demand_find_task t WHERE t.demand_content_id = dc.id ) ORDER BY dc.id ASC LIMIT 1 """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql) row = cur.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def get_first_running_task() -> Optional[Dict[str, Any]]: """ 查找 demand_find_task 中 status=STATUS_RUNNING(1) 的任务(理论上仅一条)。 用于服务重启后恢复执行中的任务:联表取出 query(demand_content.name)。 Returns: {"demand_content_id": int, "query": str, "trace_id": str} 或 None """ sql = """ SELECT t.demand_content_id, t.trace_id, dc.name AS query FROM demand_find_task t INNER JOIN demand_content dc ON dc.id = t.demand_content_id WHERE t.status = %s ORDER BY t.id ASC LIMIT 1 """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (STATUS_RUNNING,)) row = cur.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"get_first_running_task 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def get_one_today_unprocessed_demand(*, dt: int) -> Optional[Dict[str, Any]]: """ 从 demand_content 中取「当天 dt」且尚未在 demand_find_task 中出现过的 1 条需求。 - 不按品类分组(不再使用 merge_leve2) - dt 与表字段一致:一般为 YYYYMMDD 整数(如 20260402) Returns: {"demand_content_id": int, "query": str} 或 None """ sql = """ SELECT dc.id AS demand_content_id, dc.name AS query FROM demand_content dc WHERE dc.dt = %s AND NOT EXISTS ( SELECT 1 FROM demand_find_task t WHERE t.demand_content_id = dc.id ) ORDER BY dc.score DESC, dc.id DESC LIMIT 1 """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (int(dt),)) row = cur.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"get_one_today_unprocessed_demand 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def get_daily_unprocessed_pool( *, total_limit: int = 20, per_category_limit: int = 5, ) -> List[Dict[str, Any]]: """ 生成“当天任务池”: - 先按 demand_content.merge_leve2 品类去重分组 - 每个品类取 score 最高的 N 条(per_category_limit) - 全局最多取 M 条(total_limit) - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败) """ sql = """ SELECT x.demand_content_id, x.query, x.merge_leve2, x.score FROM ( SELECT dc.id AS demand_content_id, dc.name AS query, dc.merge_leve2 AS merge_leve2, dc.score AS score, ROW_NUMBER() OVER ( PARTITION BY COALESCE(dc.merge_leve2, '') ORDER BY dc.score DESC, dc.id DESC ) AS rn FROM demand_content dc WHERE NOT EXISTS ( SELECT 1 FROM demand_find_task t WHERE t.demand_content_id = dc.id ) ) x WHERE x.rn <= %s ORDER BY x.score DESC, x.demand_content_id DESC LIMIT %s """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (per_category_limit, total_limit)) rows = cur.fetchall() or [] return [dict(r) for r in rows] except Exception as e: logger.error(f"get_daily_unprocessed_pool 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None: """ 在 demand_find_task 中新增一条记录。 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值 (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。 """ sql = """ INSERT INTO demand_find_task (trace_id, demand_content_id, status) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE trace_id = IF(status = %s, VALUES(trace_id), trace_id), status = IF(status = %s, VALUES(status), status) """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute( sql, (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED), ) logger.info(f"创建任务记录: demand_content_id={demand_content_id}") except Exception as e: logger.error(f"create_task_record 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None: """ 任务完成后更新 trace_id 和 status。 匹配 trace_id 为空字符串的记录(初始创建时的占位)。 """ sql = """ UPDATE demand_find_task SET trace_id = %s, status = %s WHERE demand_content_id = %s AND trace_id = '' """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (trace_id, status, demand_content_id)) logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}") except Exception as e: logger.error(f"update_task_on_complete 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def fetch_trace_ids_created_after(cutoff: datetime) -> list[str]: """ 查询 demand_find_task 中 created_at 晚于 cutoff 的去重 trace_id(排除空串)。 cutoff 与表字段比较规则与 MySQL DATETIME/TIMESTAMP 一致。 """ sql = """ SELECT DISTINCT trace_id FROM demand_find_task WHERE created_at > %s AND trace_id IS NOT NULL AND trace_id <> '' ORDER BY trace_id """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (cutoff,)) rows = cur.fetchall() or [] return [str(r["trace_id"]) for r in rows] except Exception as e: logger.error(f"fetch_trace_ids_created_after 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None: """ 更新 demand_find_task 中指定记录的状态。 trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。 """ sql = """ UPDATE demand_find_task SET status = %s WHERE trace_id = %s AND demand_content_id = %s """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (status, trace_id, demand_content_id)) logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}") except Exception as e: logger.error(f"update_task_status 失败: {e}", exc_info=True) raise finally: if conn: conn.close()