| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import sys
- from pathlib import Path
- from typing import Dict, List
- # 支持直接以脚本方式运行:python scheduler/change_history_status.py
- _PROJECT_ROOT = Path(__file__).resolve().parent.parent
- if str(_PROJECT_ROOT) not in sys.path:
- sys.path.insert(0, str(_PROJECT_ROOT))
- from scheduler.decode_dispatch_job import (
- RESULT_POLL_CHUNK,
- _apply_result_row_to_db,
- _submit_decode_result_chunk,
- )
- from utils.scheduler_logger import get_scheduler_logger
- from utils.sync_mysql_help import mysql
- logger = get_scheduler_logger()
- def _fetch_pending_vids_by_dt() -> Dict[str, List[str]]:
- sql = """
- SELECT DISTINCT dt, vid
- FROM aigc_topic_decode_task_result
- WHERE status IN (0, 1)
- AND dt < DATE_FORMAT(CURDATE(), '%Y%m%d')
- AND vid IS NOT NULL
- AND vid != ''
- ORDER BY dt, vid
- """
- rows = mysql.fetchall(sql)
- grouped: Dict[str, List[str]] = {}
- for row in rows:
- dt = str(row.get("dt") or "").strip()
- vid = str(row.get("vid") or "").strip()
- if not dt or not vid:
- continue
- grouped.setdefault(dt, []).append(vid)
- return grouped
- def _refresh_result_for_dt(dt: str, vids: List[str]) -> None:
- if not vids:
- return
- logger.info("开始回填历史解码结果 dt={} 待查询数量={}", dt, len(vids))
- for i in range(0, len(vids), RESULT_POLL_CHUNK):
- chunk = vids[i : i + RESULT_POLL_CHUNK]
- ok, msg, body = _submit_decode_result_chunk(chunk)
- if not ok:
- logger.error(
- "历史解码结果查询失败 dt={} msg={} body={}",
- dt,
- msg,
- body,
- )
- continue
- data_list = body.get("data")
- if not isinstance(data_list, list):
- logger.warning("历史解码结果返回data非列表 dt={} body={}", dt, body)
- continue
- returned_ids = {str(x.get("channelContentId") or "").strip() for x in data_list}
- missing = set(chunk) - returned_ids
- if missing:
- logger.warning(
- "历史解码结果返回缺少{}个vid dt={} 示例={}",
- len(missing),
- dt,
- list(missing)[:5],
- )
- for item in data_list:
- if not isinstance(item, dict):
- continue
- _apply_result_row_to_db(dt, item)
- logger.info(
- "历史解码结果分片处理完成 dt={} 分片大小={} 返回数量={}",
- dt,
- len(chunk),
- len(data_list),
- )
- def run_change_history_status_job() -> None:
- logger.info("历史状态回填任务开始")
- try:
- vids_by_dt = _fetch_pending_vids_by_dt()
- total_rows = sum(len(v) for v in vids_by_dt.values())
- logger.info("历史状态回填待处理总量={} 涉及dt数={}", total_rows, len(vids_by_dt))
- for dt, vids in vids_by_dt.items():
- _refresh_result_for_dt(dt, vids)
- logger.info("历史状态回填任务结束")
- except Exception as exc:
- logger.exception("历史状态回填任务异常: {}", exc)
- if __name__ == "__main__":
- run_change_history_status_job()
|