schedule.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. """
  2. 定时任务相关数据库操作
  3. demand_content: 原始检索内容库
  4. demand_find_task: 执行记录表,通过 demand_content_id 关联
  5. """
  6. import logging
  7. from typing import Any, Dict, Optional
  8. from .connection import get_connection
  9. logger = logging.getLogger(__name__)
  10. # 状态常量(与 demand_find_task 表一致)
  11. STATUS_PENDING = 0
  12. STATUS_RUNNING = 1
  13. STATUS_SUCCESS = 2
  14. STATUS_FAILED = 3
  15. def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
  16. """
  17. 联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。
  18. 未处理定义:该 demand_content_id 在 demand_find_task 中尚无任何记录。
  19. 已有任务则视为已跑过(含失败),不再被定时任务选中。
  20. Returns:
  21. {"demand_content_id": int, "query": str} 或 None
  22. """
  23. sql = """
  24. SELECT dc.id AS demand_content_id,
  25. dc.name AS query
  26. FROM demand_content dc
  27. WHERE NOT EXISTS (
  28. SELECT 1 FROM demand_find_task t
  29. WHERE t.demand_content_id = dc.id
  30. )
  31. ORDER BY dc.id ASC
  32. LIMIT 1
  33. """
  34. conn = None
  35. try:
  36. conn = get_connection()
  37. with conn.cursor() as cur:
  38. cur.execute(sql)
  39. row = cur.fetchone()
  40. return dict(row) if row else None
  41. except Exception as e:
  42. logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True)
  43. raise
  44. finally:
  45. if conn:
  46. conn.close()
  47. def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
  48. """
  49. 在 demand_find_task 中新增一条记录。
  50. 初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。
  51. 若 uk_trace_demand 冲突:仅当原行 status 为失败时,将 trace_id/status 重置为本次插入值
  52. (用于上次失败且 trace_id 仍为空时,定时任务会再次选中同一需求,不能重复 INSERT 空 trace)。
  53. """
  54. sql = """
  55. INSERT INTO demand_find_task (trace_id, demand_content_id, status)
  56. VALUES (%s, %s, %s)
  57. ON DUPLICATE KEY UPDATE
  58. trace_id = IF(status = %s, VALUES(trace_id), trace_id),
  59. status = IF(status = %s, VALUES(status), status)
  60. """
  61. conn = None
  62. try:
  63. conn = get_connection()
  64. with conn.cursor() as cur:
  65. cur.execute(
  66. sql,
  67. (trace_id, demand_content_id, status, STATUS_FAILED, STATUS_FAILED),
  68. )
  69. logger.info(f"创建任务记录: demand_content_id={demand_content_id}")
  70. except Exception as e:
  71. logger.error(f"create_task_record 失败: {e}", exc_info=True)
  72. raise
  73. finally:
  74. if conn:
  75. conn.close()
  76. def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None:
  77. """
  78. 任务完成后更新 trace_id 和 status。
  79. 匹配 trace_id 为空字符串的记录(初始创建时的占位)。
  80. """
  81. sql = """
  82. UPDATE demand_find_task
  83. SET trace_id = %s, status = %s
  84. WHERE demand_content_id = %s AND trace_id = ''
  85. """
  86. conn = None
  87. try:
  88. conn = get_connection()
  89. with conn.cursor() as cur:
  90. cur.execute(sql, (trace_id, status, demand_content_id))
  91. logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}")
  92. except Exception as e:
  93. logger.error(f"update_task_on_complete 失败: {e}", exc_info=True)
  94. raise
  95. finally:
  96. if conn:
  97. conn.close()
  98. def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
  99. """
  100. 更新 demand_find_task 中指定记录的状态。
  101. trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。
  102. """
  103. sql = """
  104. UPDATE demand_find_task
  105. SET status = %s
  106. WHERE trace_id = %s AND demand_content_id = %s
  107. """
  108. conn = None
  109. try:
  110. conn = get_connection()
  111. with conn.cursor() as cur:
  112. cur.execute(sql, (status, trace_id, demand_content_id))
  113. logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}")
  114. except Exception as e:
  115. logger.error(f"update_task_status 失败: {e}", exc_info=True)
  116. raise
  117. finally:
  118. if conn:
  119. conn.close()