schedule.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. """
  2. 定时任务相关数据库操作
  3. demand_content: 原始检索内容库
  4. demand_find_task: 执行记录表,通过 demand_content_id 关联
  5. """
  6. import logging
  7. from typing import Any, Dict, List, Optional
  8. from .connection import get_connection
  9. logger = logging.getLogger(__name__)
  10. # 状态常量(与 demand_find_task 表一致)
  11. STATUS_PENDING = 0
  12. STATUS_RUNNING = 1
  13. STATUS_SUCCESS = 2
  14. STATUS_FAILED = 3
  15. def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
  16. """
  17. 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。
  18. 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。
  19. 已有任务则视为已跑过(含失败),不再被定时任务选中。
  20. Returns:
  21. {"demand_content_id": int, "query": str} 或 None
  22. """
  23. sql = """
  24. SELECT dc.id AS demand_content_id,
  25. dc.name AS query
  26. FROM demand_content dc
  27. WHERE NOT EXISTS (
  28. SELECT 1 FROM demand_find_task t
  29. WHERE t.demand_content_id = dc.id
  30. )
  31. ORDER BY dc.id ASC
  32. LIMIT 1
  33. """
  34. conn = None
  35. try:
  36. conn = get_connection()
  37. with conn.cursor() as cur:
  38. cur.execute(sql)
  39. row = cur.fetchone()
  40. return dict(row) if row else None
  41. except Exception as e:
  42. logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True)
  43. raise
  44. finally:
  45. if conn:
  46. conn.close()
  47. def get_daily_unprocessed_pool(
  48. *,
  49. total_limit: int = 20,
  50. per_category_limit: int = 5,
  51. ) -> List[Dict[str, Any]]:
  52. """
  53. 生成“当天任务池”:
  54. - 先按 demand_content.merge_leve2 品类去重分组
  55. - 每个品类取 score 最高的 N 条(per_category_limit)
  56. - 全局最多取 M 条(total_limit)
  57. - 过滤已处理:demand_find_task 中存在任意记录则视为已跑过(含失败)
  58. """
  59. sql = """
  60. SELECT x.demand_content_id, x.query, x.merge_leve2, x.score
  61. FROM (
  62. SELECT
  63. dc.id AS demand_content_id,
  64. dc.name AS query,
  65. dc.merge_leve2 AS merge_leve2,
  66. dc.score AS score,
  67. ROW_NUMBER() OVER (
  68. PARTITION BY COALESCE(dc.merge_leve2, '')
  69. ORDER BY dc.score DESC, dc.id DESC
  70. ) AS rn
  71. FROM demand_content dc
  72. WHERE NOT EXISTS (
  73. SELECT 1 FROM demand_find_task t
  74. WHERE t.demand_content_id = dc.id
  75. )
  76. ) x
  77. WHERE x.rn <= %s
  78. ORDER BY x.score DESC, x.demand_content_id DESC
  79. LIMIT %s
  80. """
  81. conn = None
  82. try:
  83. conn = get_connection()
  84. with conn.cursor() as cur:
  85. cur.execute(sql, (per_category_limit, total_limit))
  86. rows = cur.fetchall() or []
  87. return [dict(r) for r in rows]
  88. except Exception as e:
  89. logger.error(f"get_daily_unprocessed_pool 失败: {e}", exc_info=True)
  90. raise
  91. finally:
  92. if conn:
  93. conn.close()
  94. def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
  95. """
  96. 在 demand_find_task 中新增一条记录。
  97. 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。
  98. 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值
  99. (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。
  100. """
  101. sql = """
  102. INSERT INTO demand_find_task (trace_id, demand_content_id, status)
  103. VALUES (%s, %s, %s)
  104. ON DUPLICATE KEY UPDATE
  105. trace_id = IF(status = %s, VALUES(trace_id), trace_id),
  106. status = IF(status = %s, VALUES(status), status)
  107. """
  108. conn = None
  109. try:
  110. conn = get_connection()
  111. with conn.cursor() as cur:
  112. cur.execute(
  113. sql,
  114. (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED),
  115. )
  116. logger.info(f"创建任务记录: demand_content_id={demand_content_id}")
  117. except Exception as e:
  118. logger.error(f"create_task_record 失败: {e}", exc_info=True)
  119. raise
  120. finally:
  121. if conn:
  122. conn.close()
  123. def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None:
  124. """
  125. 任务完成后更新 trace_id 和 status。
  126. 匹配 trace_id 为空字符串的记录(初始创建时的占位)。
  127. """
  128. sql = """
  129. UPDATE demand_find_task
  130. SET trace_id = %s, status = %s
  131. WHERE demand_content_id = %s AND trace_id = ''
  132. """
  133. conn = None
  134. try:
  135. conn = get_connection()
  136. with conn.cursor() as cur:
  137. cur.execute(sql, (trace_id, status, demand_content_id))
  138. logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}")
  139. except Exception as e:
  140. logger.error(f"update_task_on_complete 失败: {e}", exc_info=True)
  141. raise
  142. finally:
  143. if conn:
  144. conn.close()
  145. def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
  146. """
  147. 更新 demand_find_task 中指定记录的状态。
  148. trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。
  149. """
  150. sql = """
  151. UPDATE demand_find_task
  152. SET status = %s
  153. WHERE trace_id = %s AND demand_content_id = %s
  154. """
  155. conn = None
  156. try:
  157. conn = get_connection()
  158. with conn.cursor() as cur:
  159. cur.execute(sql, (status, trace_id, demand_content_id))
  160. logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}")
  161. except Exception as e:
  162. logger.error(f"update_task_status 失败: {e}", exc_info=True)
  163. raise
  164. finally:
  165. if conn:
  166. conn.close()