import json from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from zoneinfo import ZoneInfo import requests from scheduler.odps_fetch import fetch_priority_posts from utils.scheduler_logger import get_scheduler_logger from utils.sync_mysql_help import mysql logger = get_scheduler_logger() CONFIG_ID = "57" DECODE_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode" DECODE_RESULT_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode/result" # 动态窗口:尽量保持当天 status IN (0,1) 的条数为此值(补充时按缺口取数)。 BATCH_SIZE = 40 ODPS_PAGE_SIZE = 200 RESULT_POLL_CHUNK = 50 def _map_api_status_to_int(api_status: str, vid: str) -> int: """Map upstream status string to DB status: 0待执行 1执行中 2成功 3失败.""" s = (api_status or "").strip().upper() if s == "SUCCESS": return 2 if s in ("FAILED", "FAILURE", "ERROR", "FAIL"): return 3 if s in ("RUNNING", "PROCESSING", "DOING"): return 1 if s in ("PENDING", "WAITING", "INIT", "QUEUED"): return 0 if not s: return 0 logger.warning("未知解码状态,按执行中处理 status={} vid={}", api_status, vid) return 1 def _safe_json_loads(text: Optional[str]) -> Dict[str, Any]: if not text: return {} try: data = json.loads(text) return data if isinstance(data, dict) else {} except Exception: return {} def _today_dt() -> str: return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d") def _is_allowed_level(level_value: Any) -> bool: try: return int(level_value) in (0, 1, 2, 3) except (TypeError, ValueError): return False def _is_decode_submit_open() -> bool: """ Gate for submitting NEW decode tasks. Only controls whether to submit; polling/querying existing tasks is unaffected. """ sql = """ SELECT is_open FROM aigc_topic_decode_task_oprate ORDER BY id DESC LIMIT 1 """ try: row = mysql.fetchone(sql) if not row: # Fail-open if table is empty to avoid blocking by default. return True return int(row.get("is_open") or 0) == 1 except Exception as exc: # Conservative: if we cannot confirm switch is open, skip submit this cycle. logger.exception("查询解构开关失败,本轮不发起新解构任务: {}", exc) return False def _fetch_today_pending_vids(dt: str) -> List[str]: sql = """ SELECT DISTINCT vid FROM aigc_topic_decode_task_result WHERE dt = %s AND status IN (0, 1) AND vid IS NOT NULL AND vid != '' ORDER BY vid """ rows = mysql.fetchall(sql, (dt,)) return [str(row["vid"]) for row in rows if row.get("vid")] def _fetch_history_pending_vids_by_dt(today_dt: str) -> Dict[str, List[str]]: sql = """ SELECT DISTINCT dt, vid FROM aigc_topic_decode_task_result WHERE status IN (0, 1) AND dt < %s AND vid IS NOT NULL AND vid != '' ORDER BY dt, vid """ rows = mysql.fetchall(sql, (today_dt,)) grouped: Dict[str, List[str]] = {} for row in rows: dt = str(row.get("dt") or "").strip() vid = str(row.get("vid") or "").strip() if not dt or not vid: continue grouped.setdefault(dt, []).append(vid) return grouped def _count_today_non_terminal(dt: str) -> int: sql = """ SELECT COUNT(1) AS total FROM aigc_topic_decode_task_result WHERE dt = %s AND status IN (0, 1) """ result = mysql.fetchone(sql, (dt,)) return int((result or {}).get("total", 0)) def _count_today_total(dt: str) -> int: sql = """ SELECT COUNT(1) AS total FROM aigc_topic_decode_task_result WHERE dt = %s """ result = mysql.fetchone(sql, (dt,)) return int((result or {}).get("total", 0)) def _fetch_decode_daily_limit() -> int: sql = """ SELECT `max` AS daily_limit FROM aigc_topic_decode_task_oprate ORDER BY id DESC LIMIT 1 """ row = mysql.fetchone(sql) return int((row or {}).get("daily_limit") or 0) def _submit_decode_result_chunk( channel_content_ids: List[str], ) -> Tuple[bool, str, Dict[str, Any]]: payload = {"params": {"configId": CONFIG_ID, "channelContentIds": channel_content_ids}} try: resp = requests.post(DECODE_RESULT_URL, json=payload, timeout=60) if resp.status_code != 200: return False, f"http_status_{resp.status_code}", {} body = resp.json() ok = body.get("code") == 0 return ok, body.get("msg") or "", body except Exception as exc: return False, str(exc), {} def _apply_result_row_to_db(dt: str, item: Dict[str, Any]) -> None: vid = str(item.get("channelContentId") or "").strip() if not vid: return api_status_raw = item.get("status") or "" err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or "" data_content = item.get("dataContent") if data_content is not None and not isinstance(data_content, str): data_content = json.dumps(data_content, ensure_ascii=False) html = item.get("html") base_status = _map_api_status_to_int(str(api_status_raw), vid) sql = """ UPDATE aigc_topic_decode_task_result SET status = %s, err_msg = %s, data_content = %s, html = %s WHERE dt = %s AND vid = %s """ mysql.execute( sql, ( base_status, err_msg[:512] if err_msg else "", data_content if data_content is not None else "", html if html is not None else None, dt, vid, ), ) def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None: if not vids: return total = len(vids) logger.info("开始查询解码结果 dt={} 总vid数={}", dt, total) overall_success = 0 overall_returned = 0 for i in range(0, total, RESULT_POLL_CHUNK): chunk = vids[i : i + RESULT_POLL_CHUNK] logger.info( "查询解码结果 dt={} 分片序号={} 分片大小={} 总数={}", dt, i // RESULT_POLL_CHUNK, len(chunk), total, ) ok, msg, body = _submit_decode_result_chunk(chunk) if not ok: logger.error( "查询解码结果接口失败 dt={} msg={} body={}", dt, msg, body, ) continue data_list = body.get("data") if not isinstance(data_list, list): logger.warning("查询解码结果返回中缺少data列表 body={}", body) continue chunk_success = 0 returned_ids = {str(x.get("channelContentId") or "") for x in data_list} missing = set(chunk) - returned_ids if missing: logger.warning( "查询解码结果返回缺少{}个vid,示例={}", len(missing), list(missing)[:5], ) for item in data_list: if not isinstance(item, dict): continue vid = str(item.get("channelContentId") or "").strip() api_status = str(item.get("status") or "") mapped_status = _map_api_status_to_int(api_status, vid) if mapped_status == 2: chunk_success += 1 err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or "" logger.info( "解码结果明细 dt={} vid={} 接口状态={} 映射状态={} 错误信息={}", dt, vid, api_status, mapped_status, err_msg[:512] if err_msg else "", ) _apply_result_row_to_db(dt, item) overall_success += chunk_success overall_returned += len(data_list) logger.info( "解码结果分片处理完成 dt={} 查询数={} 返回数={} 成功数={}", dt, len(chunk), len(data_list), chunk_success, ) logger.info( "解码结果查询完成 dt={} 查询总数={} 返回总数={} 成功总数={}", dt, total, overall_returned, overall_success, ) def _poll_decode_results_for_history(today_dt: str) -> None: vids_by_dt = _fetch_history_pending_vids_by_dt(today_dt) if not vids_by_dt: return total = sum(len(v) for v in vids_by_dt.values()) logger.info( "开始查询历史未完成解码结果 today_dt={} 涉及dt数={} 总vid数={}", today_dt, len(vids_by_dt), total, ) for dt, vids in vids_by_dt.items(): _poll_decode_results_for_today(dt, vids) logger.info( "历史未完成解码结果查询结束 today_dt={} 涉及dt数={} 总vid数={}", today_dt, len(vids_by_dt), total, ) def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: posts: List[Dict[str, Any]] = [] for item in records: extend_raw = item.get("extend") extend_obj: Dict[str, Any] if isinstance(extend_raw, dict): extend_obj = extend_raw else: extend_obj = _safe_json_loads(str(extend_raw)) if extend_raw is not None else {} # cover_url = extend_obj.get("cover_url") or "" # images = [cover_url] if cover_url else [] posts.append( { "channelContentId": item.get("vid") or "", "title": item.get("title") or "", "video": item.get("url") or "", "images": [], "contentModal": 4, "channel": 10, } ) return posts def _submit_decode(posts: List[Dict[str, Any]]) -> Tuple[bool, str, Dict[str, Any]]: payload = {"params": {"configId": CONFIG_ID, "posts": posts}} try: resp = requests.post(DECODE_URL, json=payload, timeout=60) if resp.status_code != 200: return False, f"http_status_{resp.status_code}", {} body = resp.json() ok = body.get("code") == 0 return ok, body.get("msg") or "", body except Exception as exc: return False, str(exc), {} def _load_existing_vids(dt: str) -> set[str]: sql = """ SELECT DISTINCT vid FROM aigc_topic_decode_task_result WHERE dt = %s AND vid IS NOT NULL AND vid != '' """ rows = mysql.fetchall(sql, (dt,)) return {str(row["vid"]) for row in rows if row.get("vid")} def _pick_candidate_records(dt: str, batch_size: int = BATCH_SIZE) -> List[Dict[str, Any]]: existing_vids = _load_existing_vids(dt) selected: List[Dict[str, Any]] = [] selected_vids: set[str] = set() offset = 0 while len(selected) < batch_size: page = fetch_priority_posts(limit=ODPS_PAGE_SIZE, offset=offset, dt=dt) if not page: break for item in page: vid = str(item.get("vid") or "") if ( not vid or not _is_allowed_level(item.get("level")) or vid in existing_vids or vid in selected_vids ): continue selected.append(item) selected_vids.add(vid) if len(selected) >= batch_size: break offset += ODPS_PAGE_SIZE logger.info( "候选数据筛选完成 dt={} 已选数量={} 扫描offset={}", dt, len(selected), offset, ) if selected: vid_title_pairs = [ {"vid": str(item.get("vid") or ""), "title": item.get("title") or ""} for item in selected ] logger.info("已选候选数据 dt={} items={}", dt, vid_title_pairs) return selected def _row_status_after_decode_submit( vid: str, row_in_resp: Optional[Dict[str, Any]], full_body: Dict[str, Any] ) -> Tuple[int, str, str, Optional[str]]: """Returns (status, err_msg, data_content, html) for INSERT.""" if not row_in_resp: payload = json.dumps({"decode_submit_response": full_body}, ensure_ascii=False) return 1, "", payload, None api_status_raw = row_in_resp.get("status") or "" err_msg = (row_in_resp.get("err_msg") or row_in_resp.get("errorMessage") or "") or "" mapped = _map_api_status_to_int(str(api_status_raw), vid) payload = json.dumps( {"decode_submit_item": row_in_resp, "decode_submit_response": full_body}, ensure_ascii=False, ) if mapped == 3: return 3, err_msg[:512], payload, None if mapped == 2: # New submit API only returns status/errorMessage. # Keep SUCCESS as terminal success; detailed result is queried via decode/result. return 2, "", payload, None if mapped == 0: return 0, err_msg[:512], payload, None return 1, err_msg[:512], payload, None def _insert_task_result_row( source: Dict[str, Any], status: int, err_msg: str, data_content: str, html: Optional[str], ) -> None: extend = _safe_json_loads(source.get("extend")) cover_url = extend.get("cover_url", "") cover_text = cover_url if isinstance(cover_url, str) else "" images_text = source.get("url") or "" sql = """ INSERT INTO aigc_topic_decode_task_result (task_id, status, err_msg, vid, title, cover, video_url, images, type, channel, cate1, cate2, dt, data_content, html) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = ( None, status, err_msg or "", str(source.get("vid") or ""), source.get("title") or "", cover_text, source.get("url") or "", images_text, source.get("type") or "", source.get("channel") or "", source.get("cate1") or "", source.get("cate2") or "", source.get("dt") or _today_dt(), data_content, html, ) mysql.execute(sql, params) def _insert_rows_after_decode_submit(records: List[Dict[str, Any]], body: Dict[str, Any]) -> None: data_list = body.get("data") if isinstance(body.get("data"), list) else [] by_vid = {str(x.get("channelContentId") or ""): x for x in data_list if isinstance(x, dict)} for item in records: vid = str(item.get("vid") or "") row = by_vid.get(vid) status, err_msg, data_content, html = _row_status_after_decode_submit(vid, row, body) _insert_task_result_row(item, status, err_msg, data_content, html) def run_decode_dispatch_job() -> None: logger.info("解码调度任务开始执行") try: dt = _today_dt() _poll_decode_results_for_history(dt) before_poll_non_terminal = _count_today_non_terminal(dt) if before_poll_non_terminal > 0: logger.info( "当天存在待执行/执行中任务,先拉取解码结果 dt={} count={}", dt, before_poll_non_terminal, ) pending_vids = _fetch_today_pending_vids(dt) if pending_vids: logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids)) _poll_decode_results_for_today(dt, pending_vids) elif before_poll_non_terminal > 0: logger.warning( "存在非终态记录但未获取到可查询vid dt={} count={}", dt, before_poll_non_terminal, ) after_poll_non_terminal = _count_today_non_terminal(dt) if pending_vids or before_poll_non_terminal > 0: logger.info( "解码结果查询阶段结束 dt={} 查询前非终态={} 查询后非终态={}", dt, before_poll_non_terminal, after_poll_non_terminal, ) if not _is_decode_submit_open(): logger.info("解构开关关闭(is_open!=1),跳过本轮新批次发起 dt={}", dt) logger.info("解码调度任务结束(开关关闭:不发起新任务)") return window_need = max(0, BATCH_SIZE - after_poll_non_terminal) if window_need == 0: logger.info( "解构中已满{}条,本轮无需补充 dt={} non_terminal={}", BATCH_SIZE, dt, after_poll_non_terminal, ) logger.info("解码调度任务结束(窗口已满)") return daily_limit = _fetch_decode_daily_limit() today_total = _count_today_total(dt) daily_remaining = max(0, daily_limit - today_total) if daily_remaining == 0: logger.info( "当日解构已达到上限,不再发起新任务 dt={} daily_limit={} today_total={}", dt, daily_limit, today_total, ) logger.info("解码调度任务结束(达到当日上限)") return need = min(window_need, daily_remaining) logger.info( "动态窗口补充 dt={} 当前解构中={} 目标={} 窗口缺口={} 日上限={} 当日已发起={} 当日剩余额度={} 本次补充={}", dt, after_poll_non_terminal, BATCH_SIZE, window_need, daily_limit, today_total, daily_remaining, need, ) records = _pick_candidate_records(dt=dt, batch_size=need) if not records: logger.info("无可发起的新批次候选数据 dt={} need={}", dt, need) logger.info("解码调度任务结束(无新增任务)") return posts = _build_posts_payload(records) logger.info("解码提交接口执行开始 posts={}", posts) ok, err_msg, body = _submit_decode(posts) logger.info( "解码提交接口执行完成 success={} records={} msg={} body={}", ok, len(records), err_msg, body, ) if not ok: fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False) for item in records: _insert_task_result_row( item, status=3, err_msg=err_msg or "解码提交失败", data_content=fail_body, html=None, ) else: if isinstance(body.get("data"), list) and body["data"]: _insert_rows_after_decode_submit(records, body) else: payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False) for item in records: _insert_task_result_row( item, status=1, err_msg="", data_content=payload, html=None, ) logger.info("解码调度任务结束,本轮新发起数量={}", len(records)) except Exception as exc: logger.exception("解码调度任务异常退出: {}", exc) return