schedule.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. """
  2. 定时任务相关数据库操作
  3. demand_content: 原始检索内容库(name→query,suggestion→补充信息)
  4. demand_find_task: 执行记录表,通过 demand_content_id 关联
  5. """
  6. import logging
  7. from datetime import datetime
  8. from typing import Any, Dict, List, Optional
  9. from .connection import get_connection
  10. logger = logging.getLogger(__name__)
  11. # 状态常量(与 demand_find_task 表一致)
  12. STATUS_PENDING = 0
  13. STATUS_RUNNING = 1
  14. STATUS_SUCCESS = 2
  15. STATUS_FAILED = 3
  16. def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
  17. """
  18. 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。
  19. 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。
  20. 已有任务则视为已跑过(含失败),不再被定时任务选中。
  21. Returns:
  22. {"demand_content_id": int, "query": str, "suggestion": Optional[str]} 或 None
  23. """
  24. sql = """
  25. SELECT dc.id AS demand_content_id,
  26. dc.name AS query,
  27. dc.suggestion AS suggestion
  28. FROM demand_content dc
  29. WHERE NOT EXISTS (
  30. SELECT 1 FROM demand_find_task t
  31. WHERE t.demand_content_id = dc.id
  32. )
  33. ORDER BY dc.id ASC
  34. LIMIT 1
  35. """
  36. conn = None
  37. try:
  38. conn = get_connection()
  39. with conn.cursor() as cur:
  40. cur.execute(sql)
  41. row = cur.fetchone()
  42. return dict(row) if row else None
  43. except Exception as e:
  44. logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True)
  45. raise
  46. finally:
  47. if conn:
  48. conn.close()
  49. def get_first_running_task() -> Optional[Dict[str, Any]]:
  50. """
  51. 查找 demand_find_task 中 status=STATUS_RUNNING(1) 的任务(理论上仅一条)。
  52. 用于服务重启后恢复执行中的任务:联表取出 query(demand_content.name)、suggestion。
  53. Returns:
  54. {"demand_content_id": int, "query": str, "suggestion": Optional[str], "trace_id": str} 或 None
  55. """
  56. sql = """
  57. SELECT t.demand_content_id,
  58. t.trace_id,
  59. dc.name AS query,
  60. dc.suggestion AS suggestion
  61. FROM demand_find_task t
  62. INNER JOIN demand_content dc ON dc.id = t.demand_content_id
  63. WHERE t.status = %s
  64. ORDER BY t.id ASC
  65. LIMIT 1
  66. """
  67. conn = None
  68. try:
  69. conn = get_connection()
  70. with conn.cursor() as cur:
  71. cur.execute(sql, (STATUS_RUNNING,))
  72. row = cur.fetchone()
  73. return dict(row) if row else None
  74. except Exception as e:
  75. logger.error(f"get_first_running_task 失败: {e}", exc_info=True)
  76. raise
  77. finally:
  78. if conn:
  79. conn.close()
  80. def get_latest_demand_task_oprate_is_open() -> Optional[int]:
  81. """
  82. 读取 demand_task_oprate 中按 update_time 最新的一行的 is_open。
  83. - 1:允许定时寻找任务继续派发
  84. - 0:关闭定时派发
  85. - 无记录时返回 None,调用方可视为默认打开(与未建表/无数据时的兼容行为)
  86. Returns:
  87. is_open 整数值,或表为空时 None
  88. """
  89. sql = """
  90. SELECT is_open
  91. FROM demand_task_oprate
  92. ORDER BY update_time DESC, id DESC
  93. LIMIT 1
  94. """
  95. conn = None
  96. try:
  97. conn = get_connection()
  98. with conn.cursor() as cur:
  99. cur.execute(sql)
  100. row = cur.fetchone()
  101. if not row:
  102. return None
  103. val = row.get("is_open")
  104. return int(val) if val is not None else None
  105. except Exception as e:
  106. logger.error(f"get_latest_demand_task_oprate_is_open 失败: {e}", exc_info=True)
  107. raise
  108. finally:
  109. if conn:
  110. conn.close()
  111. def get_one_today_unprocessed_demand(*, dt: int) -> Optional[Dict[str, Any]]:
  112. """
  113. 从 demand_content 中取「当天 dt」且尚未在 demand_find_task 中出现过的 1 条需求。
  114. - 不按品类分组(不再使用 merge_leve2)
  115. - dt 与表字段一致:一般为 YYYYMMDD 整数(如 20260402)
  116. - 同 dt 下按 score 降序取第一条(最高分优先)
  117. Returns:
  118. {"demand_content_id": int, "query": str, "suggestion": Optional[str], "score": Any} 或 None
  119. """
  120. sql = """
  121. SELECT dc.id AS demand_content_id,
  122. dc.name AS query,
  123. dc.suggestion AS suggestion,
  124. dc.score AS score
  125. FROM demand_content dc
  126. WHERE dc.dt = %s
  127. AND NOT EXISTS (
  128. SELECT 1 FROM demand_find_task t
  129. WHERE t.demand_content_id = dc.id
  130. )
  131. ORDER BY dc.score DESC, dc.id DESC
  132. LIMIT 1
  133. """
  134. conn = None
  135. try:
  136. conn = get_connection()
  137. with conn.cursor() as cur:
  138. cur.execute(sql, (int(dt),))
  139. row = cur.fetchone()
  140. return dict(row) if row else None
  141. except Exception as e:
  142. logger.error(f"get_one_today_unprocessed_demand 失败: {e}", exc_info=True)
  143. raise
  144. finally:
  145. if conn:
  146. conn.close()
  147. def get_daily_unprocessed_pool(
  148. *,
  149. total_limit: int = 20,
  150. per_category_limit: int = 5,
  151. ) -> List[Dict[str, Any]]:
  152. """
  153. 生成“当天任务池”:
  154. - 先按 demand_content.merge_leve2 品类去重分组
  155. - 每个品类取 score 最高的 N 条(per_category_limit)
  156. - 全局最多取 M 条(total_limit)
  157. - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败)
  158. """
  159. sql = """
  160. SELECT x.demand_content_id, x.query, x.suggestion, x.merge_leve2, x.score
  161. FROM (
  162. SELECT
  163. dc.id AS demand_content_id,
  164. dc.name AS query,
  165. dc.suggestion AS suggestion,
  166. dc.merge_leve2 AS merge_leve2,
  167. dc.score AS score,
  168. ROW_NUMBER() OVER (
  169. PARTITION BY COALESCE(dc.merge_leve2, '')
  170. ORDER BY dc.score DESC, dc.id DESC
  171. ) AS rn
  172. FROM demand_content dc
  173. WHERE NOT EXISTS (
  174. SELECT 1 FROM demand_find_task t
  175. WHERE t.demand_content_id = dc.id
  176. )
  177. ) x
  178. WHERE x.rn <= %s
  179. ORDER BY x.score DESC, x.demand_content_id DESC
  180. LIMIT %s
  181. """
  182. conn = None
  183. try:
  184. conn = get_connection()
  185. with conn.cursor() as cur:
  186. cur.execute(sql, (per_category_limit, total_limit))
  187. rows = cur.fetchall() or []
  188. return [dict(r) for r in rows]
  189. except Exception as e:
  190. logger.error(f"get_daily_unprocessed_pool 失败: {e}", exc_info=True)
  191. raise
  192. finally:
  193. if conn:
  194. conn.close()
  195. def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
  196. """
  197. 在 demand_find_task 中新增一条记录。
  198. 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。
  199. 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值
  200. (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。
  201. """
  202. sql = """
  203. INSERT INTO demand_find_task (trace_id, demand_content_id, status)
  204. VALUES (%s, %s, %s)
  205. ON DUPLICATE KEY UPDATE
  206. trace_id = IF(status = %s, VALUES(trace_id), trace_id),
  207. status = IF(status = %s, VALUES(status), status)
  208. """
  209. conn = None
  210. try:
  211. conn = get_connection()
  212. with conn.cursor() as cur:
  213. cur.execute(
  214. sql,
  215. (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED),
  216. )
  217. logger.info(f"创建任务记录: demand_content_id={demand_content_id}")
  218. except Exception as e:
  219. logger.error(f"create_task_record 失败: {e}", exc_info=True)
  220. raise
  221. finally:
  222. if conn:
  223. conn.close()
  224. def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None:
  225. """
  226. 任务完成后更新 trace_id 和 status。
  227. 匹配 trace_id 为空字符串的记录(初始创建时的占位)。
  228. """
  229. sql = """
  230. UPDATE demand_find_task
  231. SET trace_id = %s, status = %s
  232. WHERE demand_content_id = %s AND trace_id = ''
  233. """
  234. conn = None
  235. try:
  236. conn = get_connection()
  237. with conn.cursor() as cur:
  238. cur.execute(sql, (trace_id, status, demand_content_id))
  239. logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}")
  240. except Exception as e:
  241. logger.error(f"update_task_on_complete 失败: {e}", exc_info=True)
  242. raise
  243. finally:
  244. if conn:
  245. conn.close()
  246. def fetch_trace_ids_created_after(cutoff: datetime) -> list[str]:
  247. """
  248. 查询 demand_find_task 中 created_at 晚于 cutoff 的去重 trace_id(排除空串)。
  249. cutoff 与表字段比较规则与 MySQL DATETIME/TIMESTAMP 一致。
  250. """
  251. sql = """
  252. SELECT DISTINCT trace_id
  253. FROM demand_find_task
  254. WHERE created_at > %s
  255. AND trace_id IS NOT NULL
  256. AND trace_id <> ''
  257. ORDER BY trace_id
  258. """
  259. conn = None
  260. try:
  261. conn = get_connection()
  262. with conn.cursor() as cur:
  263. cur.execute(sql, (cutoff,))
  264. rows = cur.fetchall() or []
  265. return [str(r["trace_id"]) for r in rows]
  266. except Exception as e:
  267. logger.error(f"fetch_trace_ids_created_after 失败: {e}", exc_info=True)
  268. raise
  269. finally:
  270. if conn:
  271. conn.close()
  272. def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
  273. """
  274. 更新 demand_find_task 中指定记录的状态。
  275. trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。
  276. """
  277. sql = """
  278. UPDATE demand_find_task
  279. SET status = %s
  280. WHERE trace_id = %s AND demand_content_id = %s
  281. """
  282. conn = None
  283. try:
  284. conn = get_connection()
  285. with conn.cursor() as cur:
  286. cur.execute(sql, (status, trace_id, demand_content_id))
  287. logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}")
  288. except Exception as e:
  289. logger.error(f"update_task_status 失败: {e}", exc_info=True)
  290. raise
  291. finally:
  292. if conn:
  293. conn.close()