schedule.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. """
  2. 定时任务相关数据库操作
  3. demand_content: 原始检索内容库
  4. demand_find_task: 执行记录表,通过 demand_content_id 关联
  5. """
  6. import logging
  7. from datetime import datetime
  8. from typing import Any, Dict, List, Optional
  9. from .connection import get_connection
  10. logger = logging.getLogger(__name__)
  11. # 状态常量(与 demand_find_task 表一致)
  12. STATUS_PENDING = 0
  13. STATUS_RUNNING = 1
  14. STATUS_SUCCESS = 2
  15. STATUS_FAILED = 3
  16. def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
  17. """
  18. 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。
  19. 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。
  20. 已有任务则视为已跑过(含失败),不再被定时任务选中。
  21. Returns:
  22. {"demand_content_id": int, "query": str} 或 None
  23. """
  24. sql = """
  25. SELECT dc.id AS demand_content_id,
  26. dc.name AS query
  27. FROM demand_content dc
  28. WHERE NOT EXISTS (
  29. SELECT 1 FROM demand_find_task t
  30. WHERE t.demand_content_id = dc.id
  31. )
  32. ORDER BY dc.id ASC
  33. LIMIT 1
  34. """
  35. conn = None
  36. try:
  37. conn = get_connection()
  38. with conn.cursor() as cur:
  39. cur.execute(sql)
  40. row = cur.fetchone()
  41. return dict(row) if row else None
  42. except Exception as e:
  43. logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True)
  44. raise
  45. finally:
  46. if conn:
  47. conn.close()
  48. def get_daily_unprocessed_pool(
  49. *,
  50. total_limit: int = 20,
  51. per_category_limit: int = 5,
  52. ) -> List[Dict[str, Any]]:
  53. """
  54. 生成“当天任务池”:
  55. - 先按 demand_content.merge_leve2 品类去重分组
  56. - 每个品类取 score 最高的 N 条(per_category_limit)
  57. - 全局最多取 M 条(total_limit)
  58. - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败)
  59. """
  60. sql = """
  61. SELECT x.demand_content_id, x.query, x.merge_leve2, x.score
  62. FROM (
  63. SELECT
  64. dc.id AS demand_content_id,
  65. dc.name AS query,
  66. dc.merge_leve2 AS merge_leve2,
  67. dc.score AS score,
  68. ROW_NUMBER() OVER (
  69. PARTITION BY COALESCE(dc.merge_leve2, '')
  70. ORDER BY dc.score DESC, dc.id DESC
  71. ) AS rn
  72. FROM demand_content dc
  73. WHERE NOT EXISTS (
  74. SELECT 1 FROM demand_find_task t
  75. WHERE t.demand_content_id = dc.id
  76. )
  77. ) x
  78. WHERE x.rn <= %s
  79. ORDER BY x.score DESC, x.demand_content_id DESC
  80. LIMIT %s
  81. """
  82. conn = None
  83. try:
  84. conn = get_connection()
  85. with conn.cursor() as cur:
  86. cur.execute(sql, (per_category_limit, total_limit))
  87. rows = cur.fetchall() or []
  88. return [dict(r) for r in rows]
  89. except Exception as e:
  90. logger.error(f"get_daily_unprocessed_pool 失败: {e}", exc_info=True)
  91. raise
  92. finally:
  93. if conn:
  94. conn.close()
  95. def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
  96. """
  97. 在 demand_find_task 中新增一条记录。
  98. 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。
  99. 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值
  100. (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。
  101. """
  102. sql = """
  103. INSERT INTO demand_find_task (trace_id, demand_content_id, status)
  104. VALUES (%s, %s, %s)
  105. ON DUPLICATE KEY UPDATE
  106. trace_id = IF(status = %s, VALUES(trace_id), trace_id),
  107. status = IF(status = %s, VALUES(status), status)
  108. """
  109. conn = None
  110. try:
  111. conn = get_connection()
  112. with conn.cursor() as cur:
  113. cur.execute(
  114. sql,
  115. (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED),
  116. )
  117. logger.info(f"创建任务记录: demand_content_id={demand_content_id}")
  118. except Exception as e:
  119. logger.error(f"create_task_record 失败: {e}", exc_info=True)
  120. raise
  121. finally:
  122. if conn:
  123. conn.close()
  124. def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None:
  125. """
  126. 任务完成后更新 trace_id 和 status。
  127. 匹配 trace_id 为空字符串的记录(初始创建时的占位)。
  128. """
  129. sql = """
  130. UPDATE demand_find_task
  131. SET trace_id = %s, status = %s
  132. WHERE demand_content_id = %s AND trace_id = ''
  133. """
  134. conn = None
  135. try:
  136. conn = get_connection()
  137. with conn.cursor() as cur:
  138. cur.execute(sql, (trace_id, status, demand_content_id))
  139. logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}")
  140. except Exception as e:
  141. logger.error(f"update_task_on_complete 失败: {e}", exc_info=True)
  142. raise
  143. finally:
  144. if conn:
  145. conn.close()
  146. def fetch_trace_ids_created_after(cutoff: datetime) -> list[str]:
  147. """
  148. 查询 demand_find_task 中 created_at 晚于 cutoff 的去重 trace_id(排除空串)。
  149. cutoff 与表字段比较规则与 MySQL DATETIME/TIMESTAMP 一致。
  150. """
  151. sql = """
  152. SELECT DISTINCT trace_id
  153. FROM demand_find_task
  154. WHERE created_at > %s
  155. AND trace_id IS NOT NULL
  156. AND trace_id <> ''
  157. ORDER BY trace_id
  158. """
  159. conn = None
  160. try:
  161. conn = get_connection()
  162. with conn.cursor() as cur:
  163. cur.execute(sql, (cutoff,))
  164. rows = cur.fetchall() or []
  165. return [str(r["trace_id"]) for r in rows]
  166. except Exception as e:
  167. logger.error(f"fetch_trace_ids_created_after 失败: {e}", exc_info=True)
  168. raise
  169. finally:
  170. if conn:
  171. conn.close()
  172. def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
  173. """
  174. 更新 demand_find_task 中指定记录的状态。
  175. trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。
  176. """
  177. sql = """
  178. UPDATE demand_find_task
  179. SET status = %s
  180. WHERE trace_id = %s AND demand_content_id = %s
  181. """
  182. conn = None
  183. try:
  184. conn = get_connection()
  185. with conn.cursor() as cur:
  186. cur.execute(sql, (status, trace_id, demand_content_id))
  187. logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}")
  188. except Exception as e:
  189. logger.error(f"update_task_status 失败: {e}", exc_info=True)
  190. raise
  191. finally:
  192. if conn:
  193. conn.close()