change_history_status.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import sys
  2. from pathlib import Path
  3. from typing import Dict, List
  4. # 支持直接以脚本方式运行:python scheduler/change_history_status.py
  5. _PROJECT_ROOT = Path(__file__).resolve().parent.parent
  6. if str(_PROJECT_ROOT) not in sys.path:
  7. sys.path.insert(0, str(_PROJECT_ROOT))
  8. from scheduler.decode_dispatch_job import (
  9. RESULT_POLL_CHUNK,
  10. _apply_result_row_to_db,
  11. _submit_decode_result_chunk,
  12. )
  13. from utils.scheduler_logger import get_scheduler_logger
  14. from utils.sync_mysql_help import mysql
  15. logger = get_scheduler_logger()
  16. def _fetch_pending_vids_by_dt() -> Dict[str, List[str]]:
  17. sql = """
  18. SELECT DISTINCT dt, vid
  19. FROM aigc_topic_decode_task_result
  20. WHERE status IN (0, 1)
  21. AND dt < DATE_FORMAT(CURDATE(), '%Y%m%d')
  22. AND vid IS NOT NULL
  23. AND vid != ''
  24. ORDER BY dt, vid
  25. """
  26. rows = mysql.fetchall(sql)
  27. grouped: Dict[str, List[str]] = {}
  28. for row in rows:
  29. dt = str(row.get("dt") or "").strip()
  30. vid = str(row.get("vid") or "").strip()
  31. if not dt or not vid:
  32. continue
  33. grouped.setdefault(dt, []).append(vid)
  34. return grouped
  35. def _refresh_result_for_dt(dt: str, vids: List[str]) -> None:
  36. if not vids:
  37. return
  38. logger.info("开始回填历史解码结果 dt={} 待查询数量={}", dt, len(vids))
  39. for i in range(0, len(vids), RESULT_POLL_CHUNK):
  40. chunk = vids[i : i + RESULT_POLL_CHUNK]
  41. ok, msg, body = _submit_decode_result_chunk(chunk)
  42. if not ok:
  43. logger.error(
  44. "历史解码结果查询失败 dt={} msg={} body={}",
  45. dt,
  46. msg,
  47. body,
  48. )
  49. continue
  50. data_list = body.get("data")
  51. if not isinstance(data_list, list):
  52. logger.warning("历史解码结果返回data非列表 dt={} body={}", dt, body)
  53. continue
  54. returned_ids = {str(x.get("channelContentId") or "").strip() for x in data_list}
  55. missing = set(chunk) - returned_ids
  56. if missing:
  57. logger.warning(
  58. "历史解码结果返回缺少{}个vid dt={} 示例={}",
  59. len(missing),
  60. dt,
  61. list(missing)[:5],
  62. )
  63. for item in data_list:
  64. if not isinstance(item, dict):
  65. continue
  66. _apply_result_row_to_db(dt, item)
  67. logger.info(
  68. "历史解码结果分片处理完成 dt={} 分片大小={} 返回数量={}",
  69. dt,
  70. len(chunk),
  71. len(data_list),
  72. )
  73. def run_change_history_status_job() -> None:
  74. logger.info("历史状态回填任务开始")
  75. try:
  76. vids_by_dt = _fetch_pending_vids_by_dt()
  77. total_rows = sum(len(v) for v in vids_by_dt.values())
  78. logger.info("历史状态回填待处理总量={} 涉及dt数={}", total_rows, len(vids_by_dt))
  79. for dt, vids in vids_by_dt.items():
  80. _refresh_result_for_dt(dt, vids)
  81. logger.info("历史状态回填任务结束")
  82. except Exception as exc:
  83. logger.exception("历史状态回填任务异常: {}", exc)
  84. if __name__ == "__main__":
  85. run_change_history_status_job()