schedule.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. """
  2. 定时任务相关数据库操作
  3. demand_content: 原始检索内容库(name→query,suggestion→补充信息)
  4. demand_find_task: 执行记录表,通过 demand_content_id 关联
  5. """
  6. import logging
  7. from datetime import datetime
  8. from decimal import Decimal
  9. from typing import Any, Dict, List, Optional
  10. from .connection import get_connection
  11. logger = logging.getLogger(__name__)
  12. # 状态常量(与 demand_find_task 表一致)
  13. STATUS_PENDING = 0
  14. STATUS_RUNNING = 1
  15. STATUS_SUCCESS = 2
  16. STATUS_FAILED = 3
  17. def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
  18. """
  19. 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。
  20. 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。
  21. 已有任务则视为已跑过(含失败),不再被定时任务选中。
  22. Returns:
  23. {"demand_content_id": int, "query": str, "suggestion": Optional[str]} 或 None
  24. """
  25. sql = """
  26. SELECT dc.id AS demand_content_id,
  27. dc.name AS query,
  28. dc.suggestion AS suggestion
  29. FROM demand_content dc
  30. WHERE NOT EXISTS (
  31. SELECT 1 FROM demand_find_task t
  32. WHERE t.demand_content_id = dc.id
  33. )
  34. ORDER BY dc.id ASC
  35. LIMIT 1
  36. """
  37. conn = None
  38. try:
  39. conn = get_connection()
  40. with conn.cursor() as cur:
  41. cur.execute(sql)
  42. row = cur.fetchone()
  43. return dict(row) if row else None
  44. except Exception as e:
  45. logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True)
  46. raise
  47. finally:
  48. if conn:
  49. conn.close()
  50. def get_first_running_task() -> Optional[Dict[str, Any]]:
  51. """
  52. 查找 demand_find_task 中 status=STATUS_RUNNING(1) 的任务(理论上仅一条)。
  53. 用于服务重启后恢复执行中的任务:联表取出 query(demand_content.name)、suggestion。
  54. Returns:
  55. {"demand_content_id": int, "query": str, "suggestion": Optional[str], "trace_id": str} 或 None
  56. """
  57. sql = """
  58. SELECT t.demand_content_id,
  59. t.trace_id,
  60. dc.name AS query,
  61. dc.suggestion AS suggestion
  62. FROM demand_find_task t
  63. INNER JOIN demand_content dc ON dc.id = t.demand_content_id
  64. WHERE t.status = %s
  65. ORDER BY t.id ASC
  66. LIMIT 1
  67. """
  68. conn = None
  69. try:
  70. conn = get_connection()
  71. with conn.cursor() as cur:
  72. cur.execute(sql, (STATUS_RUNNING,))
  73. row = cur.fetchone()
  74. return dict(row) if row else None
  75. except Exception as e:
  76. logger.error(f"get_first_running_task 失败: {e}", exc_info=True)
  77. raise
  78. finally:
  79. if conn:
  80. conn.close()
  81. def get_latest_demand_task_oprate_is_open() -> Optional[int]:
  82. """
  83. 读取 demand_task_oprate 中按 update_time 最新的一行的 is_open。
  84. - 1:允许定时寻找任务继续派发
  85. - 0:关闭定时派发
  86. - 无记录时返回 None,调用方可视为默认打开(与未建表/无数据时的兼容行为)
  87. Returns:
  88. is_open 整数值,或表为空时 None
  89. """
  90. sql = """
  91. SELECT is_open
  92. FROM demand_task_oprate
  93. ORDER BY update_time DESC, id DESC
  94. LIMIT 1
  95. """
  96. conn = None
  97. try:
  98. conn = get_connection()
  99. with conn.cursor() as cur:
  100. cur.execute(sql)
  101. row = cur.fetchone()
  102. if not row:
  103. return None
  104. val = row.get("is_open")
  105. return int(val) if val is not None else None
  106. except Exception as e:
  107. logger.error(f"get_latest_demand_task_oprate_is_open 失败: {e}", exc_info=True)
  108. raise
  109. finally:
  110. if conn:
  111. conn.close()
  112. def get_latest_day_limit_coast() -> Optional[Decimal]:
  113. """
  114. 读取 demand_task_oprate 中按 update_time 最新的一行的 day_limit_coast。
  115. Returns:
  116. Decimal(两位小数) 或 None(无记录或字段为空)
  117. """
  118. sql = """
  119. SELECT day_limit_coast
  120. FROM demand_task_oprate
  121. ORDER BY update_time DESC, id DESC
  122. LIMIT 1
  123. """
  124. conn = None
  125. try:
  126. conn = get_connection()
  127. with conn.cursor() as cur:
  128. cur.execute(sql)
  129. row = cur.fetchone()
  130. if not row:
  131. return None
  132. val = row.get("day_limit_coast")
  133. if val is None:
  134. return None
  135. return Decimal(str(val))
  136. except Exception as e:
  137. logger.error(f"get_latest_day_limit_coast 失败: {e}", exc_info=True)
  138. raise
  139. finally:
  140. if conn:
  141. conn.close()
  142. def get_total_token_coast_between(start_time: datetime, end_time: datetime) -> Decimal:
  143. """
  144. 统计 demand_find_task 在指定时间区间内的 token_coast 总和(基于 created_at 字段)。
  145. start_time <= created_at < end_time
  146. """
  147. sql = """
  148. SELECT COALESCE(SUM(token_coast), 0) AS total_coast
  149. FROM demand_find_task
  150. WHERE created_at >= %s
  151. AND created_at < %s
  152. """
  153. conn = None
  154. try:
  155. conn = get_connection()
  156. with conn.cursor() as cur:
  157. cur.execute(sql, (start_time, end_time))
  158. row = cur.fetchone() or {}
  159. val = row.get("total_coast") if row is not None else None
  160. if val is None:
  161. return Decimal("0")
  162. return Decimal(str(val))
  163. except Exception as e:
  164. logger.error(f"get_total_token_coast_between 失败: {e}", exc_info=True)
  165. raise
  166. finally:
  167. if conn:
  168. conn.close()
  169. def get_one_today_unprocessed_demand(*, dt: int) -> Optional[Dict[str, Any]]:
  170. """
  171. 从 demand_content 中取「当天 dt」且尚未在 demand_find_task 中出现过的 1 条需求,
  172. 采用“分品类分层轮转”策略:
  173. - 先取每个品类分数最高的 1 条(各品类 top1),按 score 降序优先执行
  174. - 当各品类 top1 跑完后,再取各品类 top2,仍按 score 降序优先执行
  175. - 依此类推
  176. - dt 与表字段一致:一般为 YYYYMMDD 整数(如 20260402)
  177. Returns:
  178. {
  179. "demand_content_id": int,
  180. "query": str,
  181. "suggestion": Optional[str],
  182. "score": Any,
  183. "merge_leve2": Optional[str],
  184. "category_rank": int,
  185. } 或 None
  186. """
  187. sql = """
  188. SELECT x.demand_content_id,
  189. x.query,
  190. x.suggestion,
  191. x.score,
  192. x.merge_leve2,
  193. x.rn AS category_rank
  194. FROM (
  195. SELECT dc.id AS demand_content_id,
  196. dc.name AS query,
  197. dc.suggestion AS suggestion,
  198. dc.score AS score,
  199. dc.merge_leve2 AS merge_leve2,
  200. ROW_NUMBER() OVER (
  201. PARTITION BY COALESCE(dc.merge_leve2, '')
  202. ORDER BY dc.score DESC, dc.id DESC
  203. ) AS rn
  204. FROM demand_content dc
  205. WHERE dc.dt = %s
  206. AND NOT EXISTS (
  207. SELECT 1 FROM demand_find_task t
  208. WHERE t.demand_content_id = dc.id
  209. )
  210. ) x
  211. ORDER BY x.rn ASC, x.score DESC, x.demand_content_id DESC
  212. LIMIT 1
  213. """
  214. conn = None
  215. try:
  216. conn = get_connection()
  217. with conn.cursor() as cur:
  218. cur.execute(sql, (int(dt),))
  219. row = cur.fetchone()
  220. return dict(row) if row else None
  221. except Exception as e:
  222. logger.error(f"get_one_today_unprocessed_demand 失败: {e}", exc_info=True)
  223. raise
  224. finally:
  225. if conn:
  226. conn.close()
  227. def get_daily_unprocessed_pool(
  228. *,
  229. total_limit: int = 20,
  230. per_category_limit: int = 5,
  231. ) -> List[Dict[str, Any]]:
  232. """
  233. 生成“当天任务池”:
  234. - 先按 demand_content.merge_leve2 品类去重分组
  235. - 每个品类取 score 最高的 N 条(per_category_limit)
  236. - 全局最多取 M 条(total_limit)
  237. - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败)
  238. """
  239. sql = """
  240. SELECT x.demand_content_id, x.query, x.suggestion, x.merge_leve2, x.score
  241. FROM (
  242. SELECT
  243. dc.id AS demand_content_id,
  244. dc.name AS query,
  245. dc.suggestion AS suggestion,
  246. dc.merge_leve2 AS merge_leve2,
  247. dc.score AS score,
  248. ROW_NUMBER() OVER (
  249. PARTITION BY COALESCE(dc.merge_leve2, '')
  250. ORDER BY dc.score DESC, dc.id DESC
  251. ) AS rn
  252. FROM demand_content dc
  253. WHERE NOT EXISTS (
  254. SELECT 1 FROM demand_find_task t
  255. WHERE t.demand_content_id = dc.id
  256. )
  257. ) x
  258. WHERE x.rn <= %s
  259. ORDER BY x.score DESC, x.demand_content_id DESC
  260. LIMIT %s
  261. """
  262. conn = None
  263. try:
  264. conn = get_connection()
  265. with conn.cursor() as cur:
  266. cur.execute(sql, (per_category_limit, total_limit))
  267. rows = cur.fetchall() or []
  268. return [dict(r) for r in rows]
  269. except Exception as e:
  270. logger.error(f"get_daily_unprocessed_pool 失败: {e}", exc_info=True)
  271. raise
  272. finally:
  273. if conn:
  274. conn.close()
  275. def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
  276. """
  277. 在 demand_find_task 中新增一条记录。
  278. 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。
  279. 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值
  280. (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。
  281. """
  282. sql = """
  283. INSERT INTO demand_find_task (trace_id, demand_content_id, status)
  284. VALUES (%s, %s, %s)
  285. ON DUPLICATE KEY UPDATE
  286. trace_id = IF(status = %s, VALUES(trace_id), trace_id),
  287. status = IF(status = %s, VALUES(status), status)
  288. """
  289. conn = None
  290. try:
  291. conn = get_connection()
  292. with conn.cursor() as cur:
  293. cur.execute(
  294. sql,
  295. (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED),
  296. )
  297. logger.info(f"创建任务记录: demand_content_id={demand_content_id}")
  298. except Exception as e:
  299. logger.error(f"create_task_record 失败: {e}", exc_info=True)
  300. raise
  301. finally:
  302. if conn:
  303. conn.close()
  304. def update_task_on_complete(
  305. demand_content_id: int,
  306. trace_id: str,
  307. status: int,
  308. token_coast: Optional[Decimal] = None,
  309. ) -> None:
  310. """
  311. 任务完成后更新 trace_id 和 status。
  312. 匹配 trace_id 为空字符串的记录(初始创建时的占位)。
  313. """
  314. sql = """
  315. UPDATE demand_find_task
  316. SET trace_id = %s,
  317. status = %s
  318. """
  319. params: list[Any] = [trace_id, status]
  320. if token_coast is not None:
  321. sql += ", token_coast = %s\n"
  322. params.append(token_coast)
  323. sql += "WHERE demand_content_id = %s AND trace_id = ''"
  324. params.append(demand_content_id)
  325. conn = None
  326. try:
  327. conn = get_connection()
  328. with conn.cursor() as cur:
  329. cur.execute(sql, tuple(params))
  330. logger.info(
  331. "更新任务完成: demand_content_id=%s, trace_id=%s, status=%s, token_coast=%s",
  332. demand_content_id,
  333. trace_id,
  334. status,
  335. token_coast,
  336. )
  337. except Exception as e:
  338. logger.error("update_task_on_complete 失败: %s", e, exc_info=True)
  339. raise
  340. finally:
  341. if conn:
  342. conn.close()
  343. def fetch_trace_ids_created_after(cutoff: datetime) -> list[str]:
  344. """
  345. 查询 demand_find_task 中 created_at 晚于 cutoff 的去重 trace_id(排除空串)。
  346. cutoff 与表字段比较规则与 MySQL DATETIME/TIMESTAMP 一致。
  347. """
  348. sql = """
  349. SELECT DISTINCT trace_id
  350. FROM demand_find_task
  351. WHERE created_at > %s
  352. AND trace_id IS NOT NULL
  353. AND trace_id <> ''
  354. ORDER BY trace_id
  355. """
  356. conn = None
  357. try:
  358. conn = get_connection()
  359. with conn.cursor() as cur:
  360. cur.execute(sql, (cutoff,))
  361. rows = cur.fetchall() or []
  362. return [str(r["trace_id"]) for r in rows]
  363. except Exception as e:
  364. logger.error(f"fetch_trace_ids_created_after 失败: {e}", exc_info=True)
  365. raise
  366. finally:
  367. if conn:
  368. conn.close()
  369. def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
  370. """
  371. 更新 demand_find_task 中指定记录的状态。
  372. trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。
  373. """
  374. sql = """
  375. UPDATE demand_find_task
  376. SET status = %s
  377. WHERE trace_id = %s AND demand_content_id = %s
  378. """
  379. conn = None
  380. try:
  381. conn = get_connection()
  382. with conn.cursor() as cur:
  383. cur.execute(sql, (status, trace_id, demand_content_id))
  384. logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}")
  385. except Exception as e:
  386. logger.error(f"update_task_status 失败: {e}", exc_info=True)
  387. raise
  388. finally:
  389. if conn:
  390. conn.close()