""" 定时任务相关数据库操作 demand_content: 原始检索内容库 demand_find_task: 执行记录表,通过 demand_content_id 关联 """ import logging from typing import Any, Dict, Optional from .connection import get_connection logger = logging.getLogger(__name__) # 状态常量(与 demand_find_task 表一致) STATUS_PENDING = 0 STATUS_RUNNING = 1 STATUS_SUCCESS = 2 STATUS_FAILED = 3 def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]: """ 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。 已有任务则视为已跑过(含失败),不再被定时任务选中。 Returns: {"demand_content_id": int, "query": str} 或 None """ sql = """ SELECT dc.id AS demand_content_id, dc.name AS query FROM demand_content dc WHERE NOT EXISTS ( SELECT 1 FROM demand_find_task t WHERE t.demand_content_id = dc.id ) ORDER BY dc.id ASC LIMIT 1 """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql) row = cur.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None: """ 在 demand_find_task 中新增一条记录。 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值 (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。 """ sql = """ INSERT INTO demand_find_task (trace_id, demand_content_id, status) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE trace_id = IF(status = %s, VALUES(trace_id), trace_id), status = IF(status = %s, VALUES(status), status) """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute( sql, (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED), ) logger.info(f"创建任务记录: demand_content_id={demand_content_id}") except Exception as e: logger.error(f"create_task_record 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None: """ 任务完成后更新 trace_id 和 status。 匹配 trace_id 为空字符串的记录(初始创建时的占位)。 """ sql = """ UPDATE demand_find_task SET trace_id = %s, status = %s WHERE demand_content_id = %s AND trace_id = '' """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (trace_id, status, demand_content_id)) logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}") except Exception as e: logger.error(f"update_task_on_complete 失败: {e}", exc_info=True) raise finally: if conn: conn.close() def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None: """ 更新 demand_find_task 中指定记录的状态。 trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。 """ sql = """ UPDATE demand_find_task SET status = %s WHERE trace_id = %s AND demand_content_id = %s """ conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(sql, (status, trace_id, demand_content_id)) logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}") except Exception as e: logger.error(f"update_task_status 失败: {e}", exc_info=True) raise finally: if conn: conn.close()