decode_dispatch_job.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. import json
  2. from datetime import datetime
  3. from typing import Any, Dict, List, Optional, Tuple
  4. from zoneinfo import ZoneInfo
  5. import requests
  6. from scheduler.odps_fetch import fetch_priority_posts
  7. from utils.scheduler_logger import get_scheduler_logger
  8. from utils.sync_mysql_help import mysql
  9. logger = get_scheduler_logger()
  10. CONFIG_ID = "57"
  11. DECODE_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode"
  12. DECODE_RESULT_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode/result"
  13. BATCH_SIZE = 10
  14. ODPS_PAGE_SIZE = 200
  15. RESULT_POLL_CHUNK = 50
  16. def _map_api_status_to_int(api_status: str, vid: str) -> int:
  17. """Map upstream status string to DB status: 0待执行 1执行中 2成功 3失败."""
  18. s = (api_status or "").strip().upper()
  19. if s == "SUCCESS":
  20. return 2
  21. if s in ("FAILED", "FAILURE", "ERROR", "FAIL"):
  22. return 3
  23. if s in ("RUNNING", "PROCESSING", "DOING"):
  24. return 1
  25. if s in ("PENDING", "WAITING", "INIT", "QUEUED"):
  26. return 0
  27. if not s:
  28. return 0
  29. logger.warning("未知解码状态,按执行中处理 status={} vid={}", api_status, vid)
  30. return 1
  31. def _safe_json_loads(text: Optional[str]) -> Dict[str, Any]:
  32. if not text:
  33. return {}
  34. try:
  35. data = json.loads(text)
  36. return data if isinstance(data, dict) else {}
  37. except Exception:
  38. return {}
  39. def _today_dt() -> str:
  40. return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
  41. def _fetch_today_pending_vids(dt: str) -> List[str]:
  42. sql = """
  43. SELECT DISTINCT vid
  44. FROM aigc_topic_decode_task_result
  45. WHERE dt = %s AND status IN (0, 1) AND vid IS NOT NULL AND vid != ''
  46. ORDER BY vid
  47. """
  48. rows = mysql.fetchall(sql, (dt,))
  49. return [str(row["vid"]) for row in rows if row.get("vid")]
  50. def _count_today_non_terminal(dt: str) -> int:
  51. sql = """
  52. SELECT COUNT(1) AS total
  53. FROM aigc_topic_decode_task_result
  54. WHERE dt = %s AND status IN (0, 1)
  55. """
  56. result = mysql.fetchone(sql, (dt,))
  57. return int((result or {}).get("total", 0))
  58. def _submit_decode_result_chunk(
  59. channel_content_ids: List[str],
  60. ) -> Tuple[bool, str, Dict[str, Any]]:
  61. payload = {"params": {"configId": CONFIG_ID, "channelContentIds": channel_content_ids}}
  62. try:
  63. resp = requests.post(DECODE_RESULT_URL, json=payload, timeout=60)
  64. if resp.status_code != 200:
  65. return False, f"http_status_{resp.status_code}", {}
  66. body = resp.json()
  67. ok = body.get("code") == 0
  68. return ok, body.get("msg") or "", body
  69. except Exception as exc:
  70. return False, str(exc), {}
  71. def _apply_result_row_to_db(dt: str, item: Dict[str, Any]) -> None:
  72. vid = str(item.get("channelContentId") or "").strip()
  73. if not vid:
  74. return
  75. api_status_raw = item.get("status") or ""
  76. err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
  77. data_content = item.get("dataContent")
  78. if data_content is not None and not isinstance(data_content, str):
  79. data_content = json.dumps(data_content, ensure_ascii=False)
  80. html = item.get("html")
  81. base_status = _map_api_status_to_int(str(api_status_raw), vid)
  82. sql = """
  83. UPDATE aigc_topic_decode_task_result
  84. SET status = %s,
  85. err_msg = %s,
  86. data_content = %s,
  87. html = %s
  88. WHERE dt = %s AND vid = %s
  89. """
  90. mysql.execute(
  91. sql,
  92. (
  93. base_status,
  94. err_msg[:512] if err_msg else "",
  95. data_content if data_content is not None else "",
  96. html if html is not None else None,
  97. dt,
  98. vid,
  99. ),
  100. )
  101. def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
  102. if not vids:
  103. return
  104. total = len(vids)
  105. logger.info("开始查询解码结果 dt={} 总vid数={}", dt, total)
  106. overall_success = 0
  107. overall_returned = 0
  108. for i in range(0, total, RESULT_POLL_CHUNK):
  109. chunk = vids[i : i + RESULT_POLL_CHUNK]
  110. logger.info(
  111. "查询解码结果 dt={} 分片序号={} 分片大小={} 总数={}",
  112. dt,
  113. i // RESULT_POLL_CHUNK,
  114. len(chunk),
  115. total,
  116. )
  117. ok, msg, body = _submit_decode_result_chunk(chunk)
  118. if not ok:
  119. logger.error(
  120. "查询解码结果接口失败 dt={} msg={} body={}",
  121. dt,
  122. msg,
  123. body,
  124. )
  125. continue
  126. data_list = body.get("data")
  127. if not isinstance(data_list, list):
  128. logger.warning("查询解码结果返回中缺少data列表 body={}", body)
  129. continue
  130. chunk_success = 0
  131. returned_ids = {str(x.get("channelContentId") or "") for x in data_list}
  132. missing = set(chunk) - returned_ids
  133. if missing:
  134. logger.warning(
  135. "查询解码结果返回缺少{}个vid,示例={}",
  136. len(missing),
  137. list(missing)[:5],
  138. )
  139. for item in data_list:
  140. if not isinstance(item, dict):
  141. continue
  142. vid = str(item.get("channelContentId") or "").strip()
  143. api_status = str(item.get("status") or "")
  144. mapped_status = _map_api_status_to_int(api_status, vid)
  145. if mapped_status == 2:
  146. chunk_success += 1
  147. err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
  148. logger.info(
  149. "解码结果明细 dt={} vid={} 接口状态={} 映射状态={} 错误信息={}",
  150. dt,
  151. vid,
  152. api_status,
  153. mapped_status,
  154. err_msg[:512] if err_msg else "",
  155. )
  156. _apply_result_row_to_db(dt, item)
  157. overall_success += chunk_success
  158. overall_returned += len(data_list)
  159. logger.info(
  160. "解码结果分片处理完成 dt={} 查询数={} 返回数={} 成功数={}",
  161. dt,
  162. len(chunk),
  163. len(data_list),
  164. chunk_success,
  165. )
  166. logger.info(
  167. "解码结果查询完成 dt={} 查询总数={} 返回总数={} 成功总数={}",
  168. dt,
  169. total,
  170. overall_returned,
  171. overall_success,
  172. )
  173. def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  174. posts: List[Dict[str, Any]] = []
  175. for item in records:
  176. posts.append(
  177. {
  178. "channelContentId": item.get("vid") or "",
  179. "title": item.get("title") or "",
  180. "video": item.get("url") or "",
  181. "contentModal": 4,
  182. "channel": 10,
  183. }
  184. )
  185. return posts
  186. def _submit_decode(posts: List[Dict[str, Any]]) -> Tuple[bool, str, Dict[str, Any]]:
  187. payload = {"params": {"configId": CONFIG_ID, "posts": posts}}
  188. try:
  189. resp = requests.post(DECODE_URL, json=payload, timeout=60)
  190. if resp.status_code != 200:
  191. return False, f"http_status_{resp.status_code}", {}
  192. body = resp.json()
  193. ok = body.get("code") == 0
  194. return ok, body.get("msg") or "", body
  195. except Exception as exc:
  196. return False, str(exc), {}
  197. def _load_existing_vids(dt: str) -> set[str]:
  198. sql = """
  199. SELECT DISTINCT vid
  200. FROM aigc_topic_decode_task_result
  201. WHERE dt = %s AND vid IS NOT NULL AND vid != ''
  202. """
  203. rows = mysql.fetchall(sql, (dt,))
  204. return {str(row["vid"]) for row in rows if row.get("vid")}
  205. def _pick_candidate_records(dt: str, batch_size: int = BATCH_SIZE) -> List[Dict[str, Any]]:
  206. existing_vids = _load_existing_vids(dt)
  207. selected: List[Dict[str, Any]] = []
  208. selected_vids: set[str] = set()
  209. offset = 0
  210. while len(selected) < batch_size:
  211. page = fetch_priority_posts(limit=ODPS_PAGE_SIZE, offset=offset, dt=dt)
  212. if not page:
  213. break
  214. for item in page:
  215. vid = str(item.get("vid") or "")
  216. if not vid or vid in existing_vids or vid in selected_vids:
  217. continue
  218. selected.append(item)
  219. selected_vids.add(vid)
  220. if len(selected) >= batch_size:
  221. break
  222. offset += ODPS_PAGE_SIZE
  223. logger.info(
  224. "候选数据筛选完成 dt={} 已选数量={} 扫描offset={}",
  225. dt,
  226. len(selected),
  227. offset,
  228. )
  229. if selected:
  230. vid_title_pairs = [
  231. {"vid": str(item.get("vid") or ""), "title": item.get("title") or ""}
  232. for item in selected
  233. ]
  234. logger.info("已选候选数据 dt={} items={}", dt, vid_title_pairs)
  235. return selected
  236. def _row_status_after_decode_submit(
  237. vid: str, row_in_resp: Optional[Dict[str, Any]], full_body: Dict[str, Any]
  238. ) -> Tuple[int, str, str, Optional[str]]:
  239. """Returns (status, err_msg, data_content, html) for INSERT."""
  240. if not row_in_resp:
  241. payload = json.dumps({"decode_submit_response": full_body}, ensure_ascii=False)
  242. return 1, "", payload, None
  243. api_status_raw = row_in_resp.get("status") or ""
  244. err_msg = (row_in_resp.get("err_msg") or row_in_resp.get("errorMessage") or "") or ""
  245. mapped = _map_api_status_to_int(str(api_status_raw), vid)
  246. payload = json.dumps(
  247. {"decode_submit_item": row_in_resp, "decode_submit_response": full_body},
  248. ensure_ascii=False,
  249. )
  250. if mapped == 3:
  251. return 3, err_msg[:512], payload, None
  252. if mapped == 2:
  253. # New submit API only returns status/errorMessage.
  254. # Keep SUCCESS as terminal success; detailed result is queried via decode/result.
  255. return 2, "", payload, None
  256. if mapped == 0:
  257. return 0, err_msg[:512], payload, None
  258. return 1, err_msg[:512], payload, None
  259. def _insert_task_result_row(
  260. source: Dict[str, Any],
  261. status: int,
  262. err_msg: str,
  263. data_content: str,
  264. html: Optional[str],
  265. ) -> None:
  266. extend = _safe_json_loads(source.get("extend"))
  267. cover_url = extend.get("cover_url", "")
  268. images_text = cover_url if isinstance(cover_url, str) else ""
  269. sql = """
  270. INSERT INTO aigc_topic_decode_task_result
  271. (task_id, status, err_msg, vid, title, cover, video_url, images, type, channel, cate1, cate2, dt, data_content, html)
  272. VALUES
  273. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  274. """
  275. params = (
  276. None,
  277. status,
  278. err_msg or "",
  279. str(source.get("vid") or ""),
  280. source.get("title") or "",
  281. images_text,
  282. source.get("url") or "",
  283. images_text,
  284. source.get("type") or "",
  285. source.get("channel") or "",
  286. source.get("cate1") or "",
  287. source.get("cate2") or "",
  288. source.get("dt") or _today_dt(),
  289. data_content,
  290. html,
  291. )
  292. mysql.execute(sql, params)
  293. def _insert_rows_after_decode_submit(records: List[Dict[str, Any]], body: Dict[str, Any]) -> None:
  294. data_list = body.get("data") if isinstance(body.get("data"), list) else []
  295. by_vid = {str(x.get("channelContentId") or ""): x for x in data_list if isinstance(x, dict)}
  296. for item in records:
  297. vid = str(item.get("vid") or "")
  298. row = by_vid.get(vid)
  299. status, err_msg, data_content, html = _row_status_after_decode_submit(vid, row, body)
  300. _insert_task_result_row(item, status, err_msg, data_content, html)
  301. def run_decode_dispatch_job() -> None:
  302. logger.info("解码调度任务开始执行")
  303. dt = _today_dt()
  304. # Startup guard: if there are in-flight tasks today, poll only in this run.
  305. # New batch submit will wait for next scheduler cycle after all are terminal.
  306. initial_non_terminal = _count_today_non_terminal(dt)
  307. if initial_non_terminal > 0:
  308. logger.info(
  309. "启动时发现当天存在进行中任务,本轮仅查询不发起新批次 dt={} count={}",
  310. dt,
  311. initial_non_terminal,
  312. )
  313. pending_vids = _fetch_today_pending_vids(dt)
  314. if pending_vids:
  315. logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
  316. _poll_decode_results_for_today(dt, pending_vids)
  317. else:
  318. logger.warning(
  319. "存在非终态记录但未获取到可查询vid dt={} count={}",
  320. dt,
  321. initial_non_terminal,
  322. )
  323. remaining_non_terminal = _count_today_non_terminal(dt)
  324. if remaining_non_terminal > 0:
  325. logger.info(
  326. "查询后仍有待执行/执行中任务,跳过新批次发起 dt={} count={}",
  327. dt,
  328. remaining_non_terminal,
  329. )
  330. else:
  331. logger.info(
  332. "查询后当天进行中任务已清空,将在下一轮发起新批次 dt={}",
  333. dt,
  334. )
  335. logger.info("解码调度任务结束(启动保护:仅查询)")
  336. return
  337. records = _pick_candidate_records(dt=dt, batch_size=BATCH_SIZE)
  338. if not records:
  339. logger.info("无可发起的新批次候选数据 dt={}", dt)
  340. logger.info("解码调度任务结束(无新增任务)")
  341. return
  342. posts = _build_posts_payload(records)
  343. ok, err_msg, body = _submit_decode(posts)
  344. logger.info(
  345. "解码提交接口执行完成 success={} records={} msg={} body={}",
  346. ok,
  347. len(records),
  348. err_msg,
  349. body,
  350. )
  351. if not ok:
  352. fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
  353. for item in records:
  354. _insert_task_result_row(
  355. item,
  356. status=3,
  357. err_msg=err_msg or "解码提交失败",
  358. data_content=fail_body,
  359. html=None,
  360. )
  361. else:
  362. if isinstance(body.get("data"), list) and body["data"]:
  363. _insert_rows_after_decode_submit(records, body)
  364. else:
  365. payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
  366. for item in records:
  367. _insert_task_result_row(
  368. item,
  369. status=1,
  370. err_msg="",
  371. data_content=payload,
  372. html=None,
  373. )
  374. logger.info("解码调度任务结束,本轮新发起数量={}", len(records))