decode_dispatch_job.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. import json
  2. from datetime import datetime
  3. from typing import Any, Dict, List, Optional, Tuple
  4. from zoneinfo import ZoneInfo
  5. import requests
  6. from scheduler.odps_fetch import fetch_priority_posts
  7. from utils.scheduler_logger import get_scheduler_logger
  8. from utils.sync_mysql_help import mysql
  9. logger = get_scheduler_logger()
  10. CONFIG_ID = "57"
  11. DECODE_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode"
  12. DECODE_RESULT_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode/result"
  13. # 动态窗口:尽量保持当天 status IN (0,1) 的条数为此值(补充时按缺口取数)。
  14. BATCH_SIZE = 40
  15. ODPS_PAGE_SIZE = 200
  16. RESULT_POLL_CHUNK = 50
  17. def _map_api_status_to_int(api_status: str, vid: str) -> int:
  18. """Map upstream status string to DB status: 0待执行 1执行中 2成功 3失败."""
  19. s = (api_status or "").strip().upper()
  20. if s == "SUCCESS":
  21. return 2
  22. if s in ("FAILED", "FAILURE", "ERROR", "FAIL"):
  23. return 3
  24. if s in ("RUNNING", "PROCESSING", "DOING"):
  25. return 1
  26. if s in ("PENDING", "WAITING", "INIT", "QUEUED"):
  27. return 0
  28. if not s:
  29. return 0
  30. logger.warning("未知解码状态,按执行中处理 status={} vid={}", api_status, vid)
  31. return 1
  32. def _safe_json_loads(text: Optional[str]) -> Dict[str, Any]:
  33. if not text:
  34. return {}
  35. try:
  36. data = json.loads(text)
  37. return data if isinstance(data, dict) else {}
  38. except Exception:
  39. return {}
  40. def _today_dt() -> str:
  41. return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
  42. def _is_allowed_level(level_value: Any) -> bool:
  43. try:
  44. return int(level_value) in (0, 1, 2, 3)
  45. except (TypeError, ValueError):
  46. return False
  47. def _is_decode_submit_open() -> bool:
  48. """
  49. Gate for submitting NEW decode tasks.
  50. Only controls whether to submit; polling/querying existing tasks is unaffected.
  51. """
  52. sql = """
  53. SELECT is_open
  54. FROM aigc_topic_decode_task_oprate
  55. ORDER BY id DESC
  56. LIMIT 1
  57. """
  58. try:
  59. row = mysql.fetchone(sql)
  60. if not row:
  61. # Fail-open if table is empty to avoid blocking by default.
  62. return True
  63. return int(row.get("is_open") or 0) == 1
  64. except Exception as exc:
  65. # Conservative: if we cannot confirm switch is open, skip submit this cycle.
  66. logger.exception("查询解构开关失败,本轮不发起新解构任务: {}", exc)
  67. return False
  68. def _fetch_today_pending_vids(dt: str) -> List[str]:
  69. sql = """
  70. SELECT DISTINCT vid
  71. FROM aigc_topic_decode_task_result
  72. WHERE dt = %s AND status IN (0, 1) AND vid IS NOT NULL AND vid != ''
  73. ORDER BY vid
  74. """
  75. rows = mysql.fetchall(sql, (dt,))
  76. return [str(row["vid"]) for row in rows if row.get("vid")]
  77. def _fetch_history_pending_vids_by_dt(today_dt: str) -> Dict[str, List[str]]:
  78. sql = """
  79. SELECT DISTINCT dt, vid
  80. FROM aigc_topic_decode_task_result
  81. WHERE status IN (0, 1)
  82. AND dt < %s
  83. AND vid IS NOT NULL
  84. AND vid != ''
  85. ORDER BY dt, vid
  86. """
  87. rows = mysql.fetchall(sql, (today_dt,))
  88. grouped: Dict[str, List[str]] = {}
  89. for row in rows:
  90. dt = str(row.get("dt") or "").strip()
  91. vid = str(row.get("vid") or "").strip()
  92. if not dt or not vid:
  93. continue
  94. grouped.setdefault(dt, []).append(vid)
  95. return grouped
  96. def _count_today_non_terminal(dt: str) -> int:
  97. sql = """
  98. SELECT COUNT(1) AS total
  99. FROM aigc_topic_decode_task_result
  100. WHERE dt = %s AND status IN (0, 1)
  101. """
  102. result = mysql.fetchone(sql, (dt,))
  103. return int((result or {}).get("total", 0))
  104. def _count_today_total(dt: str) -> int:
  105. sql = """
  106. SELECT COUNT(1) AS total
  107. FROM aigc_topic_decode_task_result
  108. WHERE dt = %s
  109. """
  110. result = mysql.fetchone(sql, (dt,))
  111. return int((result or {}).get("total", 0))
  112. def _fetch_decode_daily_limit() -> int:
  113. sql = """
  114. SELECT `max` AS daily_limit
  115. FROM aigc_topic_decode_task_oprate
  116. ORDER BY id DESC
  117. LIMIT 1
  118. """
  119. row = mysql.fetchone(sql)
  120. return int((row or {}).get("daily_limit") or 0)
  121. def _submit_decode_result_chunk(
  122. channel_content_ids: List[str],
  123. ) -> Tuple[bool, str, Dict[str, Any]]:
  124. payload = {"params": {"configId": CONFIG_ID, "channelContentIds": channel_content_ids}}
  125. try:
  126. resp = requests.post(DECODE_RESULT_URL, json=payload, timeout=60)
  127. if resp.status_code != 200:
  128. return False, f"http_status_{resp.status_code}", {}
  129. body = resp.json()
  130. ok = body.get("code") == 0
  131. return ok, body.get("msg") or "", body
  132. except Exception as exc:
  133. return False, str(exc), {}
  134. def _apply_result_row_to_db(dt: str, item: Dict[str, Any]) -> None:
  135. vid = str(item.get("channelContentId") or "").strip()
  136. if not vid:
  137. return
  138. api_status_raw = item.get("status") or ""
  139. err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
  140. data_content = item.get("dataContent")
  141. if data_content is not None and not isinstance(data_content, str):
  142. data_content = json.dumps(data_content, ensure_ascii=False)
  143. html = item.get("html")
  144. base_status = _map_api_status_to_int(str(api_status_raw), vid)
  145. sql = """
  146. UPDATE aigc_topic_decode_task_result
  147. SET status = %s,
  148. err_msg = %s,
  149. data_content = %s,
  150. html = %s
  151. WHERE dt = %s AND vid = %s
  152. """
  153. mysql.execute(
  154. sql,
  155. (
  156. base_status,
  157. err_msg[:512] if err_msg else "",
  158. data_content if data_content is not None else "",
  159. html if html is not None else None,
  160. dt,
  161. vid,
  162. ),
  163. )
  164. def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
  165. if not vids:
  166. return
  167. total = len(vids)
  168. logger.info("开始查询解码结果 dt={} 总vid数={}", dt, total)
  169. overall_success = 0
  170. overall_returned = 0
  171. for i in range(0, total, RESULT_POLL_CHUNK):
  172. chunk = vids[i : i + RESULT_POLL_CHUNK]
  173. logger.info(
  174. "查询解码结果 dt={} 分片序号={} 分片大小={} 总数={}",
  175. dt,
  176. i // RESULT_POLL_CHUNK,
  177. len(chunk),
  178. total,
  179. )
  180. ok, msg, body = _submit_decode_result_chunk(chunk)
  181. if not ok:
  182. logger.error(
  183. "查询解码结果接口失败 dt={} msg={} body={}",
  184. dt,
  185. msg,
  186. body,
  187. )
  188. continue
  189. data_list = body.get("data")
  190. if not isinstance(data_list, list):
  191. logger.warning("查询解码结果返回中缺少data列表 body={}", body)
  192. continue
  193. chunk_success = 0
  194. returned_ids = {str(x.get("channelContentId") or "") for x in data_list}
  195. missing = set(chunk) - returned_ids
  196. if missing:
  197. logger.warning(
  198. "查询解码结果返回缺少{}个vid,示例={}",
  199. len(missing),
  200. list(missing)[:5],
  201. )
  202. for item in data_list:
  203. if not isinstance(item, dict):
  204. continue
  205. vid = str(item.get("channelContentId") or "").strip()
  206. api_status = str(item.get("status") or "")
  207. mapped_status = _map_api_status_to_int(api_status, vid)
  208. if mapped_status == 2:
  209. chunk_success += 1
  210. err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
  211. logger.info(
  212. "解码结果明细 dt={} vid={} 接口状态={} 映射状态={} 错误信息={}",
  213. dt,
  214. vid,
  215. api_status,
  216. mapped_status,
  217. err_msg[:512] if err_msg else "",
  218. )
  219. _apply_result_row_to_db(dt, item)
  220. overall_success += chunk_success
  221. overall_returned += len(data_list)
  222. logger.info(
  223. "解码结果分片处理完成 dt={} 查询数={} 返回数={} 成功数={}",
  224. dt,
  225. len(chunk),
  226. len(data_list),
  227. chunk_success,
  228. )
  229. logger.info(
  230. "解码结果查询完成 dt={} 查询总数={} 返回总数={} 成功总数={}",
  231. dt,
  232. total,
  233. overall_returned,
  234. overall_success,
  235. )
  236. def _poll_decode_results_for_history(today_dt: str) -> None:
  237. vids_by_dt = _fetch_history_pending_vids_by_dt(today_dt)
  238. if not vids_by_dt:
  239. return
  240. total = sum(len(v) for v in vids_by_dt.values())
  241. logger.info(
  242. "开始查询历史未完成解码结果 today_dt={} 涉及dt数={} 总vid数={}",
  243. today_dt,
  244. len(vids_by_dt),
  245. total,
  246. )
  247. for dt, vids in vids_by_dt.items():
  248. _poll_decode_results_for_today(dt, vids)
  249. logger.info(
  250. "历史未完成解码结果查询结束 today_dt={} 涉及dt数={} 总vid数={}",
  251. today_dt,
  252. len(vids_by_dt),
  253. total,
  254. )
  255. def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  256. posts: List[Dict[str, Any]] = []
  257. for item in records:
  258. extend_raw = item.get("extend")
  259. extend_obj: Dict[str, Any]
  260. if isinstance(extend_raw, dict):
  261. extend_obj = extend_raw
  262. else:
  263. extend_obj = _safe_json_loads(str(extend_raw)) if extend_raw is not None else {}
  264. # cover_url = extend_obj.get("cover_url") or ""
  265. # images = [cover_url] if cover_url else []
  266. posts.append(
  267. {
  268. "channelContentId": item.get("vid") or "",
  269. "title": item.get("title") or "",
  270. "video": item.get("url") or "",
  271. "images": [],
  272. "mergeLeve1": item.get("cate1") or "",
  273. "mergeLeve2": item.get("cate2") or "",
  274. "contentModal": 4,
  275. "channel": 10,
  276. }
  277. )
  278. return posts
  279. def _submit_decode(posts: List[Dict[str, Any]]) -> Tuple[bool, str, Dict[str, Any]]:
  280. payload = {"params": {"configId": CONFIG_ID, "posts": posts}}
  281. try:
  282. resp = requests.post(DECODE_URL, json=payload, timeout=60)
  283. if resp.status_code != 200:
  284. return False, f"http_status_{resp.status_code}", {}
  285. body = resp.json()
  286. ok = body.get("code") == 0
  287. return ok, body.get("msg") or "", body
  288. except Exception as exc:
  289. return False, str(exc), {}
  290. def _load_existing_vids(dt: str) -> set[str]:
  291. sql = """
  292. SELECT DISTINCT vid
  293. FROM aigc_topic_decode_task_result
  294. WHERE dt = %s AND vid IS NOT NULL AND vid != ''
  295. """
  296. rows = mysql.fetchall(sql, (dt,))
  297. return {str(row["vid"]) for row in rows if row.get("vid")}
  298. def _pick_candidate_records(dt: str, batch_size: int = BATCH_SIZE) -> List[Dict[str, Any]]:
  299. existing_vids = _load_existing_vids(dt)
  300. selected: List[Dict[str, Any]] = []
  301. selected_vids: set[str] = set()
  302. offset = 0
  303. while len(selected) < batch_size:
  304. page = fetch_priority_posts(limit=ODPS_PAGE_SIZE, offset=offset, dt=dt)
  305. if not page:
  306. break
  307. for item in page:
  308. vid = str(item.get("vid") or "")
  309. if (
  310. not vid
  311. or not _is_allowed_level(item.get("level"))
  312. or vid in existing_vids
  313. or vid in selected_vids
  314. ):
  315. continue
  316. selected.append(item)
  317. selected_vids.add(vid)
  318. if len(selected) >= batch_size:
  319. break
  320. offset += ODPS_PAGE_SIZE
  321. logger.info(
  322. "候选数据筛选完成 dt={} 已选数量={} 扫描offset={}",
  323. dt,
  324. len(selected),
  325. offset,
  326. )
  327. if selected:
  328. vid_title_pairs = [
  329. {"vid": str(item.get("vid") or ""), "title": item.get("title") or ""}
  330. for item in selected
  331. ]
  332. logger.info("已选候选数据 dt={} items={}", dt, vid_title_pairs)
  333. return selected
  334. def _row_status_after_decode_submit(
  335. vid: str, row_in_resp: Optional[Dict[str, Any]], full_body: Dict[str, Any]
  336. ) -> Tuple[int, str, str, Optional[str]]:
  337. """Returns (status, err_msg, data_content, html) for INSERT."""
  338. if not row_in_resp:
  339. payload = json.dumps({"decode_submit_response": full_body}, ensure_ascii=False)
  340. return 1, "", payload, None
  341. api_status_raw = row_in_resp.get("status") or ""
  342. err_msg = (row_in_resp.get("err_msg") or row_in_resp.get("errorMessage") or "") or ""
  343. mapped = _map_api_status_to_int(str(api_status_raw), vid)
  344. payload = json.dumps(
  345. {"decode_submit_item": row_in_resp, "decode_submit_response": full_body},
  346. ensure_ascii=False,
  347. )
  348. if mapped == 3:
  349. return 3, err_msg[:512], payload, None
  350. if mapped == 2:
  351. # New submit API only returns status/errorMessage.
  352. # Keep SUCCESS as terminal success; detailed result is queried via decode/result.
  353. return 2, "", payload, None
  354. if mapped == 0:
  355. return 0, err_msg[:512], payload, None
  356. return 1, err_msg[:512], payload, None
  357. def _insert_task_result_row(
  358. source: Dict[str, Any],
  359. status: int,
  360. err_msg: str,
  361. data_content: str,
  362. html: Optional[str],
  363. ) -> None:
  364. extend = _safe_json_loads(source.get("extend"))
  365. cover_url = extend.get("cover_url", "")
  366. cover_text = cover_url if isinstance(cover_url, str) else ""
  367. images_text = source.get("url") or ""
  368. sql = """
  369. INSERT INTO aigc_topic_decode_task_result
  370. (task_id, status, err_msg, vid, title, cover, video_url, images, type, channel, cate1, cate2, dt, data_content, html)
  371. VALUES
  372. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  373. """
  374. params = (
  375. None,
  376. status,
  377. err_msg or "",
  378. str(source.get("vid") or ""),
  379. source.get("title") or "",
  380. cover_text,
  381. source.get("url") or "",
  382. images_text,
  383. source.get("type") or "",
  384. source.get("channel") or "",
  385. source.get("cate1") or "",
  386. source.get("cate2") or "",
  387. source.get("dt") or _today_dt(),
  388. data_content,
  389. html,
  390. )
  391. mysql.execute(sql, params)
  392. def _insert_rows_after_decode_submit(records: List[Dict[str, Any]], body: Dict[str, Any]) -> None:
  393. data_list = body.get("data") if isinstance(body.get("data"), list) else []
  394. by_vid = {str(x.get("channelContentId") or ""): x for x in data_list if isinstance(x, dict)}
  395. for item in records:
  396. vid = str(item.get("vid") or "")
  397. row = by_vid.get(vid)
  398. status, err_msg, data_content, html = _row_status_after_decode_submit(vid, row, body)
  399. _insert_task_result_row(item, status, err_msg, data_content, html)
  400. def run_decode_dispatch_job() -> None:
  401. logger.info("解码调度任务开始执行")
  402. try:
  403. dt = _today_dt()
  404. _poll_decode_results_for_history(dt)
  405. before_poll_non_terminal = _count_today_non_terminal(dt)
  406. if before_poll_non_terminal > 0:
  407. logger.info(
  408. "当天存在待执行/执行中任务,先拉取解码结果 dt={} count={}",
  409. dt,
  410. before_poll_non_terminal,
  411. )
  412. pending_vids = _fetch_today_pending_vids(dt)
  413. if pending_vids:
  414. logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
  415. _poll_decode_results_for_today(dt, pending_vids)
  416. elif before_poll_non_terminal > 0:
  417. logger.warning(
  418. "存在非终态记录但未获取到可查询vid dt={} count={}",
  419. dt,
  420. before_poll_non_terminal,
  421. )
  422. after_poll_non_terminal = _count_today_non_terminal(dt)
  423. if pending_vids or before_poll_non_terminal > 0:
  424. logger.info(
  425. "解码结果查询阶段结束 dt={} 查询前非终态={} 查询后非终态={}",
  426. dt,
  427. before_poll_non_terminal,
  428. after_poll_non_terminal,
  429. )
  430. if not _is_decode_submit_open():
  431. logger.info("解构开关关闭(is_open!=1),跳过本轮新批次发起 dt={}", dt)
  432. logger.info("解码调度任务结束(开关关闭:不发起新任务)")
  433. return
  434. window_need = max(0, BATCH_SIZE - after_poll_non_terminal)
  435. if window_need == 0:
  436. logger.info(
  437. "解构中已满{}条,本轮无需补充 dt={} non_terminal={}",
  438. BATCH_SIZE,
  439. dt,
  440. after_poll_non_terminal,
  441. )
  442. logger.info("解码调度任务结束(窗口已满)")
  443. return
  444. daily_limit = _fetch_decode_daily_limit()
  445. today_total = _count_today_total(dt)
  446. daily_remaining = max(0, daily_limit - today_total)
  447. if daily_remaining == 0:
  448. logger.info(
  449. "当日解构已达到上限,不再发起新任务 dt={} daily_limit={} today_total={}",
  450. dt,
  451. daily_limit,
  452. today_total,
  453. )
  454. logger.info("解码调度任务结束(达到当日上限)")
  455. return
  456. need = min(window_need, daily_remaining)
  457. logger.info(
  458. "动态窗口补充 dt={} 当前解构中={} 目标={} 窗口缺口={} 日上限={} 当日已发起={} 当日剩余额度={} 本次补充={}",
  459. dt,
  460. after_poll_non_terminal,
  461. BATCH_SIZE,
  462. window_need,
  463. daily_limit,
  464. today_total,
  465. daily_remaining,
  466. need,
  467. )
  468. records = _pick_candidate_records(dt=dt, batch_size=need)
  469. if not records:
  470. logger.info("无可发起的新批次候选数据 dt={} need={}", dt, need)
  471. logger.info("解码调度任务结束(无新增任务)")
  472. return
  473. posts = _build_posts_payload(records)
  474. logger.info("解码提交接口执行开始 posts={}", posts)
  475. ok, err_msg, body = _submit_decode(posts)
  476. logger.info(
  477. "解码提交接口执行完成 success={} records={} msg={} body={}",
  478. ok,
  479. len(records),
  480. err_msg,
  481. body,
  482. )
  483. if not ok:
  484. fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
  485. for item in records:
  486. _insert_task_result_row(
  487. item,
  488. status=3,
  489. err_msg=err_msg or "解码提交失败",
  490. data_content=fail_body,
  491. html=None,
  492. )
  493. else:
  494. if isinstance(body.get("data"), list) and body["data"]:
  495. _insert_rows_after_decode_submit(records, body)
  496. else:
  497. payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
  498. for item in records:
  499. _insert_task_result_row(
  500. item,
  501. status=1,
  502. err_msg="",
  503. data_content=payload,
  504. html=None,
  505. )
  506. logger.info("解码调度任务结束,本轮新发起数量={}", len(records))
  507. except Exception as exc:
  508. logger.exception("解码调度任务异常退出: {}", exc)
  509. return