| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- import json
- from datetime import datetime
- from typing import Any, Dict, List, Optional, Tuple
- from zoneinfo import ZoneInfo
- import requests
- from scheduler.odps_fetch import fetch_priority_posts
- from utils.scheduler_logger import get_scheduler_logger
- from utils.sync_mysql_help import mysql
- logger = get_scheduler_logger()
- CONFIG_ID = "57"
- DECODE_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode"
- DECODE_RESULT_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode/result"
- # 动态窗口:尽量保持当天 status IN (0,1) 的条数为此值(补充时按缺口取数)。
- BATCH_SIZE = 20
- ODPS_PAGE_SIZE = 200
- RESULT_POLL_CHUNK = 50
- def _map_api_status_to_int(api_status: str, vid: str) -> int:
- """Map upstream status string to DB status: 0待执行 1执行中 2成功 3失败."""
- s = (api_status or "").strip().upper()
- if s == "SUCCESS":
- return 2
- if s in ("FAILED", "FAILURE", "ERROR", "FAIL"):
- return 3
- if s in ("RUNNING", "PROCESSING", "DOING"):
- return 1
- if s in ("PENDING", "WAITING", "INIT", "QUEUED"):
- return 0
- if not s:
- return 0
- logger.warning("未知解码状态,按执行中处理 status={} vid={}", api_status, vid)
- return 1
- def _safe_json_loads(text: Optional[str]) -> Dict[str, Any]:
- if not text:
- return {}
- try:
- data = json.loads(text)
- return data if isinstance(data, dict) else {}
- except Exception:
- return {}
- def _today_dt() -> str:
- return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
- def _is_allowed_level(level_value: Any) -> bool:
- try:
- return int(level_value) in (0, 1, 2)
- except (TypeError, ValueError):
- return False
- def _is_decode_submit_open() -> bool:
- """
- Gate for submitting NEW decode tasks.
- Only controls whether to submit; polling/querying existing tasks is unaffected.
- """
- sql = """
- SELECT is_open
- FROM aigc_topic_decode_task_oprate
- ORDER BY id DESC
- LIMIT 1
- """
- try:
- row = mysql.fetchone(sql)
- if not row:
- # Fail-open if table is empty to avoid blocking by default.
- return True
- return int(row.get("is_open") or 0) == 1
- except Exception as exc:
- # Conservative: if we cannot confirm switch is open, skip submit this cycle.
- logger.exception("查询解构开关失败,本轮不发起新解构任务: {}", exc)
- return False
- def _fetch_today_pending_vids(dt: str) -> List[str]:
- sql = """
- SELECT DISTINCT vid
- FROM aigc_topic_decode_task_result
- WHERE dt = %s AND status IN (0, 1) AND vid IS NOT NULL AND vid != ''
- ORDER BY vid
- """
- rows = mysql.fetchall(sql, (dt,))
- return [str(row["vid"]) for row in rows if row.get("vid")]
- def _count_today_non_terminal(dt: str) -> int:
- sql = """
- SELECT COUNT(1) AS total
- FROM aigc_topic_decode_task_result
- WHERE dt = %s AND status IN (0, 1)
- """
- result = mysql.fetchone(sql, (dt,))
- return int((result or {}).get("total", 0))
- def _submit_decode_result_chunk(
- channel_content_ids: List[str],
- ) -> Tuple[bool, str, Dict[str, Any]]:
- payload = {"params": {"configId": CONFIG_ID, "channelContentIds": channel_content_ids}}
- try:
- resp = requests.post(DECODE_RESULT_URL, json=payload, timeout=60)
- if resp.status_code != 200:
- return False, f"http_status_{resp.status_code}", {}
- body = resp.json()
- ok = body.get("code") == 0
- return ok, body.get("msg") or "", body
- except Exception as exc:
- return False, str(exc), {}
- def _apply_result_row_to_db(dt: str, item: Dict[str, Any]) -> None:
- vid = str(item.get("channelContentId") or "").strip()
- if not vid:
- return
- api_status_raw = item.get("status") or ""
- err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
- data_content = item.get("dataContent")
- if data_content is not None and not isinstance(data_content, str):
- data_content = json.dumps(data_content, ensure_ascii=False)
- html = item.get("html")
- base_status = _map_api_status_to_int(str(api_status_raw), vid)
- sql = """
- UPDATE aigc_topic_decode_task_result
- SET status = %s,
- err_msg = %s,
- data_content = %s,
- html = %s
- WHERE dt = %s AND vid = %s
- """
- mysql.execute(
- sql,
- (
- base_status,
- err_msg[:512] if err_msg else "",
- data_content if data_content is not None else "",
- html if html is not None else None,
- dt,
- vid,
- ),
- )
- def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
- if not vids:
- return
- total = len(vids)
- logger.info("开始查询解码结果 dt={} 总vid数={}", dt, total)
- overall_success = 0
- overall_returned = 0
- for i in range(0, total, RESULT_POLL_CHUNK):
- chunk = vids[i : i + RESULT_POLL_CHUNK]
- logger.info(
- "查询解码结果 dt={} 分片序号={} 分片大小={} 总数={}",
- dt,
- i // RESULT_POLL_CHUNK,
- len(chunk),
- total,
- )
- ok, msg, body = _submit_decode_result_chunk(chunk)
- if not ok:
- logger.error(
- "查询解码结果接口失败 dt={} msg={} body={}",
- dt,
- msg,
- body,
- )
- continue
- data_list = body.get("data")
- if not isinstance(data_list, list):
- logger.warning("查询解码结果返回中缺少data列表 body={}", body)
- continue
- chunk_success = 0
- returned_ids = {str(x.get("channelContentId") or "") for x in data_list}
- missing = set(chunk) - returned_ids
- if missing:
- logger.warning(
- "查询解码结果返回缺少{}个vid,示例={}",
- len(missing),
- list(missing)[:5],
- )
- for item in data_list:
- if not isinstance(item, dict):
- continue
- vid = str(item.get("channelContentId") or "").strip()
- api_status = str(item.get("status") or "")
- mapped_status = _map_api_status_to_int(api_status, vid)
- if mapped_status == 2:
- chunk_success += 1
- err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
- logger.info(
- "解码结果明细 dt={} vid={} 接口状态={} 映射状态={} 错误信息={}",
- dt,
- vid,
- api_status,
- mapped_status,
- err_msg[:512] if err_msg else "",
- )
- _apply_result_row_to_db(dt, item)
- overall_success += chunk_success
- overall_returned += len(data_list)
- logger.info(
- "解码结果分片处理完成 dt={} 查询数={} 返回数={} 成功数={}",
- dt,
- len(chunk),
- len(data_list),
- chunk_success,
- )
- logger.info(
- "解码结果查询完成 dt={} 查询总数={} 返回总数={} 成功总数={}",
- dt,
- total,
- overall_returned,
- overall_success,
- )
- def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- posts: List[Dict[str, Any]] = []
- for item in records:
- extend_raw = item.get("extend")
- extend_obj: Dict[str, Any]
- if isinstance(extend_raw, dict):
- extend_obj = extend_raw
- else:
- extend_obj = _safe_json_loads(str(extend_raw)) if extend_raw is not None else {}
- cover_url = extend_obj.get("cover_url") or ""
- images = [cover_url] if cover_url else []
- posts.append(
- {
- "channelContentId": item.get("vid") or "",
- "title": item.get("title") or "",
- "video": item.get("url") or "",
- "images": images,
- "contentModal": 4,
- "channel": 10,
- }
- )
- return posts
- def _submit_decode(posts: List[Dict[str, Any]]) -> Tuple[bool, str, Dict[str, Any]]:
- payload = {"params": {"configId": CONFIG_ID, "posts": posts}}
- try:
- resp = requests.post(DECODE_URL, json=payload, timeout=60)
- if resp.status_code != 200:
- return False, f"http_status_{resp.status_code}", {}
- body = resp.json()
- ok = body.get("code") == 0
- return ok, body.get("msg") or "", body
- except Exception as exc:
- return False, str(exc), {}
- def _load_existing_vids(dt: str) -> set[str]:
- sql = """
- SELECT DISTINCT vid
- FROM aigc_topic_decode_task_result
- WHERE dt = %s AND vid IS NOT NULL AND vid != ''
- """
- rows = mysql.fetchall(sql, (dt,))
- return {str(row["vid"]) for row in rows if row.get("vid")}
- def _pick_candidate_records(dt: str, batch_size: int = BATCH_SIZE) -> List[Dict[str, Any]]:
- existing_vids = _load_existing_vids(dt)
- selected: List[Dict[str, Any]] = []
- selected_vids: set[str] = set()
- offset = 0
- while len(selected) < batch_size:
- page = fetch_priority_posts(limit=ODPS_PAGE_SIZE, offset=offset, dt=dt)
- if not page:
- break
- for item in page:
- vid = str(item.get("vid") or "")
- if (
- not vid
- or not _is_allowed_level(item.get("level"))
- or vid in existing_vids
- or vid in selected_vids
- ):
- continue
- selected.append(item)
- selected_vids.add(vid)
- if len(selected) >= batch_size:
- break
- offset += ODPS_PAGE_SIZE
- logger.info(
- "候选数据筛选完成 dt={} 已选数量={} 扫描offset={}",
- dt,
- len(selected),
- offset,
- )
- if selected:
- vid_title_pairs = [
- {"vid": str(item.get("vid") or ""), "title": item.get("title") or ""}
- for item in selected
- ]
- logger.info("已选候选数据 dt={} items={}", dt, vid_title_pairs)
- return selected
- def _row_status_after_decode_submit(
- vid: str, row_in_resp: Optional[Dict[str, Any]], full_body: Dict[str, Any]
- ) -> Tuple[int, str, str, Optional[str]]:
- """Returns (status, err_msg, data_content, html) for INSERT."""
- if not row_in_resp:
- payload = json.dumps({"decode_submit_response": full_body}, ensure_ascii=False)
- return 1, "", payload, None
- api_status_raw = row_in_resp.get("status") or ""
- err_msg = (row_in_resp.get("err_msg") or row_in_resp.get("errorMessage") or "") or ""
- mapped = _map_api_status_to_int(str(api_status_raw), vid)
- payload = json.dumps(
- {"decode_submit_item": row_in_resp, "decode_submit_response": full_body},
- ensure_ascii=False,
- )
- if mapped == 3:
- return 3, err_msg[:512], payload, None
- if mapped == 2:
- # New submit API only returns status/errorMessage.
- # Keep SUCCESS as terminal success; detailed result is queried via decode/result.
- return 2, "", payload, None
- if mapped == 0:
- return 0, err_msg[:512], payload, None
- return 1, err_msg[:512], payload, None
- def _insert_task_result_row(
- source: Dict[str, Any],
- status: int,
- err_msg: str,
- data_content: str,
- html: Optional[str],
- ) -> None:
- extend = _safe_json_loads(source.get("extend"))
- cover_url = extend.get("cover_url", "")
- cover_text = cover_url if isinstance(cover_url, str) else ""
- images_text = source.get("url") or ""
- sql = """
- INSERT INTO aigc_topic_decode_task_result
- (task_id, status, err_msg, vid, title, cover, video_url, images, type, channel, cate1, cate2, dt, data_content, html)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- params = (
- None,
- status,
- err_msg or "",
- str(source.get("vid") or ""),
- source.get("title") or "",
- cover_text,
- source.get("url") or "",
- images_text,
- source.get("type") or "",
- source.get("channel") or "",
- source.get("cate1") or "",
- source.get("cate2") or "",
- source.get("dt") or _today_dt(),
- data_content,
- html,
- )
- mysql.execute(sql, params)
- def _insert_rows_after_decode_submit(records: List[Dict[str, Any]], body: Dict[str, Any]) -> None:
- data_list = body.get("data") if isinstance(body.get("data"), list) else []
- by_vid = {str(x.get("channelContentId") or ""): x for x in data_list if isinstance(x, dict)}
- for item in records:
- vid = str(item.get("vid") or "")
- row = by_vid.get(vid)
- status, err_msg, data_content, html = _row_status_after_decode_submit(vid, row, body)
- _insert_task_result_row(item, status, err_msg, data_content, html)
- def run_decode_dispatch_job() -> None:
- logger.info("解码调度任务开始执行")
- try:
- dt = _today_dt()
- before_poll_non_terminal = _count_today_non_terminal(dt)
- if before_poll_non_terminal > 0:
- logger.info(
- "当天存在待执行/执行中任务,先拉取解码结果 dt={} count={}",
- dt,
- before_poll_non_terminal,
- )
- pending_vids = _fetch_today_pending_vids(dt)
- if pending_vids:
- logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
- _poll_decode_results_for_today(dt, pending_vids)
- elif before_poll_non_terminal > 0:
- logger.warning(
- "存在非终态记录但未获取到可查询vid dt={} count={}",
- dt,
- before_poll_non_terminal,
- )
- after_poll_non_terminal = _count_today_non_terminal(dt)
- if pending_vids or before_poll_non_terminal > 0:
- logger.info(
- "解码结果查询阶段结束 dt={} 查询前非终态={} 查询后非终态={}",
- dt,
- before_poll_non_terminal,
- after_poll_non_terminal,
- )
- if not _is_decode_submit_open():
- logger.info("解构开关关闭(is_open!=1),跳过本轮新批次发起 dt={}", dt)
- logger.info("解码调度任务结束(开关关闭:不发起新任务)")
- return
- need = max(0, BATCH_SIZE - after_poll_non_terminal)
- if need == 0:
- logger.info(
- "解构中已满{}条,本轮无需补充 dt={} non_terminal={}",
- BATCH_SIZE,
- dt,
- after_poll_non_terminal,
- )
- logger.info("解码调度任务结束(窗口已满)")
- return
- logger.info(
- "动态窗口补充 dt={} 当前解构中={} 目标={} 本次补充={}",
- dt,
- after_poll_non_terminal,
- BATCH_SIZE,
- need,
- )
- records = _pick_candidate_records(dt=dt, batch_size=need)
- if not records:
- logger.info("无可发起的新批次候选数据 dt={} need={}", dt, need)
- logger.info("解码调度任务结束(无新增任务)")
- return
- logger.info("解码提交接口执行开始 records={}", records)
- posts = _build_posts_payload(records)
- logger.info("解码提交接口执行开始 posts={}", posts)
- ok, err_msg, body = _submit_decode(posts)
- logger.info(
- "解码提交接口执行完成 success={} records={} msg={} body={}",
- ok,
- len(records),
- err_msg,
- body,
- )
- if not ok:
- fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
- for item in records:
- _insert_task_result_row(
- item,
- status=3,
- err_msg=err_msg or "解码提交失败",
- data_content=fail_body,
- html=None,
- )
- else:
- if isinstance(body.get("data"), list) and body["data"]:
- _insert_rows_after_decode_submit(records, body)
- else:
- payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
- for item in records:
- _insert_task_result_row(
- item,
- status=1,
- err_msg="",
- data_content=payload,
- html=None,
- )
- logger.info("解码调度任务结束,本轮新发起数量={}", len(records))
- except Exception as exc:
- logger.exception("解码调度任务异常退出: {}", exc)
- return
|