create_decode_tasks.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. import asyncio
  2. import time
  3. from typing import Dict
  4. import json
  5. import aiohttp
  6. from tqdm import tqdm
  7. from app.core.database import DatabaseManager
  8. from app.core.observability import LogService
  9. from app.infra.shared.async_tasks import run_tasks_with_asyncio_task_group
  10. from ._const import DecodeTaskConst
  11. from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper
  12. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  13. class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
  14. def __init__(self, pool: DatabaseManager, log_service: LogService):
  15. self.pool = pool
  16. self.log_service = log_service
  17. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  18. self.tool = AdPlatformArticlesDecodeUtils()
  19. async def create_single_decode_task(self, article: Dict):
  20. # Acquire Lock
  21. article_id = article["id"]
  22. acquire_lock = await self.mapper.update_article_decode_status(
  23. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  24. )
  25. if not acquire_lock:
  26. await self.log_service.log(
  27. contents={
  28. "article_id": article_id,
  29. "task": self.LogTaskKey.CREATE_SINGLE,
  30. "status": "skip",
  31. "message": "acquire lock failed",
  32. }
  33. )
  34. return
  35. # 与解构系统交互,创建解构任务
  36. response = await self.tool.create_decode_task(article)
  37. response_code = response.get("code")
  38. if response_code != self.RequestDecode.SUCCESS:
  39. # 解构任务创建失败
  40. await self.mapper.update_article_decode_status(
  41. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  42. )
  43. await self.log_service.log(
  44. contents={
  45. "article_id": article_id,
  46. "task": self.LogTaskKey.CREATE_SINGLE,
  47. "status": "fail",
  48. "data": response,
  49. }
  50. )
  51. return
  52. task_id = response.get("data", {}).get("task_id") or response.get(
  53. "data", {}
  54. ).get("taskId")
  55. if not task_id:
  56. # 解构任务创建失败
  57. await self.mapper.update_article_decode_status(
  58. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  59. )
  60. await self.log_service.log(
  61. contents={
  62. "article_id": article_id,
  63. "task": self.LogTaskKey.CREATE_SINGLE,
  64. "status": "fail",
  65. "data": response,
  66. }
  67. )
  68. return
  69. # 创建 decode 任务成功
  70. await self.log_service.log(
  71. contents={
  72. "article_id": article_id,
  73. "task": self.LogTaskKey.CREATE_SINGLE,
  74. "status": "success",
  75. "data": response,
  76. }
  77. )
  78. wx_sn = article["wx_sn"]
  79. remark = f"task_id: {task_id}-创建解构任务"
  80. record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
  81. if not record_row:
  82. # 记录解构任务失败
  83. await self.mapper.update_article_decode_status(
  84. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  85. )
  86. await self.log_service.log(
  87. contents={
  88. "article_id": article_id,
  89. "task": self.LogTaskKey.RECORD_QUEUE,
  90. "status": "fail",
  91. "message": "创建 decode 记录失败",
  92. "data": response,
  93. }
  94. )
  95. return
  96. # 记录创建成功
  97. await self.mapper.update_article_decode_status(
  98. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  99. )
  100. async def create_tasks(self):
  101. article_list = await self.mapper.fetch_decode_articles()
  102. if not article_list:
  103. await self.log_service.log(
  104. contents={
  105. "task": self.LogTaskKey.CREATE_BATCH,
  106. "message": "No more articles to decode",
  107. }
  108. )
  109. return
  110. for article in tqdm(
  111. article_list, desc=self.AdPlatformDecodeBatch.TQDM_DESCRIPTION
  112. ):
  113. await self.create_single_decode_task(article)
  114. async def deal(self):
  115. await self.create_tasks()
  116. class CreateInnerArticlesDecodeTask(DecodeTaskConst):
  117. def __init__(self, pool: DatabaseManager, log_service: LogService):
  118. self.pool = pool
  119. self.log_service = log_service
  120. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  121. self.tool = InnerArticlesDecodeUtils()
  122. async def _log_create_event(self, **contents):
  123. await self.log_service.log(
  124. contents={"task": self.InnerDecodeCreate.SCHEDULER_TASK_NAME, **contents}
  125. )
  126. @staticmethod
  127. def _trim_error_message(message: str, limit: int = None):
  128. if limit is None:
  129. limit = DecodeTaskConst.InnerDecodeCreate.ERROR_MESSAGE_MAX_CHARS
  130. if not message:
  131. return ""
  132. return message[:limit]
  133. async def _mark_retry_or_failed(
  134. self,
  135. source_id: str,
  136. task_type: int,
  137. error_message: str,
  138. retryable: bool,
  139. state: Dict | None,
  140. ):
  141. now_ts = int(time.time())
  142. retry_count = (state or {}).get("retry_count", 0)
  143. should_retry = (
  144. retryable and retry_count < self.InnerDecodeCreate.MAX_RETRY_TIMES
  145. )
  146. error_message = self._trim_error_message(error_message)
  147. if should_retry:
  148. await self.mapper.mark_create_retry(
  149. source_id=source_id,
  150. task_type=task_type,
  151. now_ts=now_ts,
  152. error_message=error_message,
  153. )
  154. await self._log_create_event(
  155. source_id=source_id,
  156. task_type=task_type,
  157. status="retry",
  158. retry_count=retry_count + 1,
  159. message=error_message,
  160. )
  161. return
  162. await self.mapper.mark_create_failed(
  163. source_id=source_id,
  164. task_type=task_type,
  165. now_ts=now_ts,
  166. error_message=error_message,
  167. )
  168. await self._log_create_event(
  169. source_id=source_id,
  170. task_type=task_type,
  171. status="failed",
  172. retry_count=retry_count + 1,
  173. message=error_message,
  174. )
  175. async def _build_payload(self, article: Dict, task_type: int):
  176. source_id = article["source_id"]
  177. match task_type:
  178. case self.TaskType.PUBLISH_TITLE_COVER:
  179. return {
  180. "source_id": source_id,
  181. "title": article["title"],
  182. "cover_img": article["cover_img_url"],
  183. "channel_content_id": article.get("wx_sn", source_id),
  184. "content_type": self.ContentType.TITLE_COVER,
  185. }
  186. case self.TaskType.SOURCE_IMAGES_TEXT:
  187. crawl_source_info = await self.mapper.fetch_article_crawler_source_info(
  188. source_id
  189. )
  190. if not crawl_source_info:
  191. raise ValueError("未找到文章抓取源信息")
  192. crawl_info = crawl_source_info[0]
  193. channel_content_id = crawl_info["channel_content_id"]
  194. raw_body_text = crawl_info["body_text"]
  195. body_text, images = self.tool.extract_body_text_and_images(
  196. raw_body_text
  197. )
  198. if not body_text and not images:
  199. raise ValueError("文章正文和图片均为空,无法创建解构任务")
  200. return {
  201. "source_id": source_id,
  202. "images": images,
  203. "body_text": raw_body_text,
  204. "channel_content_id": channel_content_id or source_id,
  205. "content_type": self.ContentType.LONG_ARTICLE,
  206. }
  207. case self.TaskType.MINI_TITLE_CARD:
  208. raise NotImplementedError("MINI_TITLE_CARD 数据未完善")
  209. case _:
  210. raise ValueError(f"unsupported task type: {task_type}")
  211. async def create_single_decode_task(self, task: Dict):
  212. article = task["article"]
  213. task_type = task["task_type"]
  214. source_id = article["source_id"]
  215. exist_task = await self.mapper.fetch_exist_source_id(source_id, task_type)
  216. if exist_task:
  217. await self.mapper.mark_create_success(
  218. source_id=source_id,
  219. task_type=task_type,
  220. remote_task_id=self.InnerDecodeCreate.DUPLICATE_SKIP_REMOTE_TASK_ID,
  221. now_ts=int(time.time()),
  222. remark="任务已存在,跳过重复创建",
  223. )
  224. await self._log_create_event(
  225. source_id=source_id,
  226. task_type=task_type,
  227. status="skip",
  228. message="decode task already exists",
  229. )
  230. return
  231. now_ts = int(time.time())
  232. lock_expire_before = now_ts - self.INNER_DECODE_LOCK_TIMEOUT_SECONDS
  233. await self.mapper.init_create_state(source_id, task_type, now_ts)
  234. acquire_lock = await self.mapper.acquire_create_lock(
  235. source_id=source_id,
  236. task_type=task_type,
  237. now_ts=now_ts,
  238. max_retry_times=self.InnerDecodeCreate.MAX_RETRY_TIMES,
  239. lock_expire_before=lock_expire_before,
  240. )
  241. if not acquire_lock:
  242. await self._log_create_event(
  243. source_id=source_id,
  244. task_type=task_type,
  245. status="skip",
  246. message="acquire create lock failed",
  247. )
  248. return
  249. state = await self.mapper.fetch_create_state(source_id, task_type)
  250. try:
  251. payload = await self._build_payload(article, task_type)
  252. response = await self.tool.create_decode_task_with_retry(
  253. payload=payload,
  254. retry_times=self.InnerDecodeCreate.MAX_RETRY_TIMES,
  255. )
  256. response_code = response.get("code")
  257. if response_code != self.RequestDecode.SUCCESS:
  258. await self._mark_retry_or_failed(
  259. source_id=source_id,
  260. task_type=task_type,
  261. error_message=f"解构任务创建失败: {json.dumps(response, ensure_ascii=False)}",
  262. retryable=False,
  263. state=state,
  264. )
  265. return
  266. task_id = response.get("data", {}).get("task_id") or response.get(
  267. "data", {}
  268. ).get("taskId")
  269. if not task_id:
  270. await self._mark_retry_or_failed(
  271. source_id=source_id,
  272. task_type=task_type,
  273. error_message=f"解构任务返回缺少 task_id: {json.dumps(response, ensure_ascii=False)}",
  274. retryable=False,
  275. state=state,
  276. )
  277. return
  278. remark = f"task_id: {task_id}-创建解构任务"
  279. record_row = await self.mapper.record_decode_task_if_absent(
  280. task_id=task_id,
  281. content_id=source_id,
  282. task_type=task_type,
  283. payload=json.dumps(payload, ensure_ascii=False),
  284. remark=remark,
  285. )
  286. if record_row not in (
  287. self.InnerDecodeCreate.INSERT_IGNORE_AFFECTED_NOOP,
  288. self.InnerDecodeCreate.INSERT_IGNORE_AFFECTED_INSERTED,
  289. ):
  290. await self._mark_retry_or_failed(
  291. source_id=source_id,
  292. task_type=task_type,
  293. error_message="创建 decode 记录失败",
  294. retryable=True,
  295. state=state,
  296. )
  297. return
  298. await self.mapper.mark_create_success(
  299. source_id=source_id,
  300. task_type=task_type,
  301. remote_task_id=task_id,
  302. now_ts=int(time.time()),
  303. remark=remark,
  304. )
  305. await self._log_create_event(
  306. source_id=source_id,
  307. task_type=task_type,
  308. status="success",
  309. retry_count=(state or {}).get("retry_count", 0),
  310. data=response,
  311. )
  312. except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError) as exc:
  313. await self._mark_retry_or_failed(
  314. source_id=source_id,
  315. task_type=task_type,
  316. error_message=f"解构服务调用异常: {exc}",
  317. retryable=True,
  318. state=state,
  319. )
  320. except (ValueError, NotImplementedError) as exc:
  321. await self._mark_retry_or_failed(
  322. source_id=source_id,
  323. task_type=task_type,
  324. error_message=str(exc),
  325. retryable=False,
  326. state=state,
  327. )
  328. except Exception as exc:
  329. await self._mark_retry_or_failed(
  330. source_id=source_id,
  331. task_type=task_type,
  332. error_message=f"创建解构任务异常: {exc}",
  333. retryable=True,
  334. state=state,
  335. )
  336. async def create_tasks(self, date_string: str = None, max_concurrency: int = None):
  337. article_list = await self.mapper.fetch_inner_articles(date_string)
  338. if not article_list:
  339. await self._log_create_event(
  340. status="empty",
  341. message="No more articles to decode",
  342. )
  343. return
  344. decode_types = [
  345. self.TaskType.SOURCE_IMAGES_TEXT,
  346. self.TaskType.PUBLISH_TITLE_COVER
  347. ]
  348. task_list = [
  349. {"article": article, "task_type": task_type}
  350. for article in article_list
  351. for task_type in decode_types
  352. ]
  353. result = await run_tasks_with_asyncio_task_group(
  354. task_list=task_list,
  355. handler=self.create_single_decode_task,
  356. description=self.InnerDecodeCreate.ASYNC_BATCH_DESCRIPTION,
  357. unit=self.InnerDecodeCreate.ASYNC_BATCH_UNIT,
  358. max_concurrency=max_concurrency
  359. or self.InnerDecodeCreate.DEFAULT_MAX_CONCURRENCY,
  360. fail_fast=self.InnerDecodeCreate.ASYNC_BATCH_FAIL_FAST,
  361. )
  362. if result["errors"]:
  363. await self._log_create_event(
  364. status="partial_error",
  365. message="some inner decode tasks raised uncaught errors",
  366. data={
  367. "total_task": result["total_task"],
  368. "processed_task": result["processed_task"],
  369. "error_count": len(result["errors"]),
  370. },
  371. )
  372. async def deal(self, date_string: str = None, max_concurrency: int = None):
  373. await self.create_tasks(
  374. date_string=date_string,
  375. max_concurrency=max_concurrency,
  376. )
  377. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]