import sys from pathlib import Path from typing import Dict, List # 支持直接以脚本方式运行:python scheduler/change_history_status.py _PROJECT_ROOT = Path(__file__).resolve().parent.parent if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) from scheduler.decode_dispatch_job import ( RESULT_POLL_CHUNK, _apply_result_row_to_db, _submit_decode_result_chunk, ) from utils.scheduler_logger import get_scheduler_logger from utils.sync_mysql_help import mysql logger = get_scheduler_logger() def _fetch_pending_vids_by_dt() -> Dict[str, List[str]]: sql = """ SELECT DISTINCT dt, vid FROM aigc_topic_decode_task_result WHERE status IN (0, 1) AND dt < DATE_FORMAT(CURDATE(), '%Y%m%d') AND vid IS NOT NULL AND vid != '' ORDER BY dt, vid """ rows = mysql.fetchall(sql) grouped: Dict[str, List[str]] = {} for row in rows: dt = str(row.get("dt") or "").strip() vid = str(row.get("vid") or "").strip() if not dt or not vid: continue grouped.setdefault(dt, []).append(vid) return grouped def _refresh_result_for_dt(dt: str, vids: List[str]) -> None: if not vids: return logger.info("开始回填历史解码结果 dt={} 待查询数量={}", dt, len(vids)) for i in range(0, len(vids), RESULT_POLL_CHUNK): chunk = vids[i : i + RESULT_POLL_CHUNK] ok, msg, body = _submit_decode_result_chunk(chunk) if not ok: logger.error( "历史解码结果查询失败 dt={} msg={} body={}", dt, msg, body, ) continue data_list = body.get("data") if not isinstance(data_list, list): logger.warning("历史解码结果返回data非列表 dt={} body={}", dt, body) continue returned_ids = {str(x.get("channelContentId") or "").strip() for x in data_list} missing = set(chunk) - returned_ids if missing: logger.warning( "历史解码结果返回缺少{}个vid dt={} 示例={}", len(missing), dt, list(missing)[:5], ) for item in data_list: if not isinstance(item, dict): continue _apply_result_row_to_db(dt, item) logger.info( "历史解码结果分片处理完成 dt={} 分片大小={} 返回数量={}", dt, len(chunk), len(data_list), ) def run_change_history_status_job() -> None: logger.info("历史状态回填任务开始") try: vids_by_dt = _fetch_pending_vids_by_dt() total_rows = sum(len(v) for v in vids_by_dt.values()) logger.info("历史状态回填待处理总量={} 涉及dt数={}", total_rows, len(vids_by_dt)) for dt, vids in vids_by_dt.items(): _refresh_result_for_dt(dt, vids) logger.info("历史状态回填任务结束") except Exception as exc: logger.exception("历史状态回填任务异常: {}", exc) if __name__ == "__main__": run_change_history_status_job()