Преглед изворни кода

feat:增加历史数据更新的脚本

jihuaqiang пре 4 дана
родитељ
комит
ab5c551e46
1 измењених фајлова са 94 додато и 0 уклоњено
  1. 94 0
      scheduler/change_history_status.py

+ 94 - 0
scheduler/change_history_status.py

@@ -0,0 +1,94 @@
+import sys
+from pathlib import Path
+from typing import Dict, List
+
+# 支持直接以脚本方式运行:python scheduler/change_history_status.py
+_PROJECT_ROOT = Path(__file__).resolve().parent.parent
+if str(_PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(_PROJECT_ROOT))
+
+from scheduler.decode_dispatch_job import (
+    RESULT_POLL_CHUNK,
+    _apply_result_row_to_db,
+    _submit_decode_result_chunk,
+)
+from utils.scheduler_logger import get_scheduler_logger
+from utils.sync_mysql_help import mysql
+
+
+logger = get_scheduler_logger()
+
+
+def _fetch_pending_vids_by_dt() -> Dict[str, List[str]]:
+    sql = """
+        SELECT DISTINCT dt, vid
+        FROM aigc_topic_decode_task_result
+        WHERE status IN (0, 1) AND vid IS NOT NULL AND vid != ''
+        ORDER BY dt, vid
+    """
+    rows = mysql.fetchall(sql)
+    grouped: Dict[str, List[str]] = {}
+    for row in rows:
+        dt = str(row.get("dt") or "").strip()
+        vid = str(row.get("vid") or "").strip()
+        if not dt or not vid:
+            continue
+        grouped.setdefault(dt, []).append(vid)
+    return grouped
+
+
+def _refresh_result_for_dt(dt: str, vids: List[str]) -> None:
+    if not vids:
+        return
+    logger.info("开始回填历史解码结果 dt={} 待查询数量={}", dt, len(vids))
+    for i in range(0, len(vids), RESULT_POLL_CHUNK):
+        chunk = vids[i : i + RESULT_POLL_CHUNK]
+        ok, msg, body = _submit_decode_result_chunk(chunk)
+        if not ok:
+            logger.error(
+                "历史解码结果查询失败 dt={} msg={} body={}",
+                dt,
+                msg,
+                body,
+            )
+            continue
+        data_list = body.get("data")
+        if not isinstance(data_list, list):
+            logger.warning("历史解码结果返回data非列表 dt={} body={}", dt, body)
+            continue
+        returned_ids = {str(x.get("channelContentId") or "").strip() for x in data_list}
+        missing = set(chunk) - returned_ids
+        if missing:
+            logger.warning(
+                "历史解码结果返回缺少{}个vid dt={} 示例={}",
+                len(missing),
+                dt,
+                list(missing)[:5],
+            )
+        for item in data_list:
+            if not isinstance(item, dict):
+                continue
+            _apply_result_row_to_db(dt, item)
+        logger.info(
+            "历史解码结果分片处理完成 dt={} 分片大小={} 返回数量={}",
+            dt,
+            len(chunk),
+            len(data_list),
+        )
+
+
+def run_change_history_status_job() -> None:
+    logger.info("历史状态回填任务开始")
+    try:
+        vids_by_dt = _fetch_pending_vids_by_dt()
+        total_rows = sum(len(v) for v in vids_by_dt.values())
+        logger.info("历史状态回填待处理总量={} 涉及dt数={}", total_rows, len(vids_by_dt))
+        for dt, vids in vids_by_dt.items():
+            _refresh_result_for_dt(dt, vids)
+        logger.info("历史状态回填任务结束")
+    except Exception as exc:
+        logger.exception("历史状态回填任务异常: {}", exc)
+
+
+if __name__ == "__main__":
+    run_change_history_status_job()