create_decode_tasks.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. import asyncio
  2. import time
  3. from typing import Dict
  4. import json
  5. import aiohttp
  6. from tqdm import tqdm
  7. from app.core.database import DatabaseManager
  8. from app.core.observability import LogService
  9. from app.infra.shared.async_tasks import run_tasks_with_asyncio_task_group
  10. from ._const import DecodeTaskConst
  11. from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper
  12. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  13. class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
  14. def __init__(self, pool: DatabaseManager, log_service: LogService):
  15. self.pool = pool
  16. self.log_service = log_service
  17. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  18. self.tool = AdPlatformArticlesDecodeUtils()
  19. async def create_single_decode_task(self, article: Dict):
  20. # Acquire Lock
  21. article_id = article["id"]
  22. acquire_lock = await self.mapper.update_article_decode_status(
  23. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  24. )
  25. if not acquire_lock:
  26. await self.log_service.log(
  27. contents={
  28. "article_id": article_id,
  29. "task": "create_decode_task",
  30. "status": "skip",
  31. "message": "acquire lock failed",
  32. }
  33. )
  34. return
  35. # 与解构系统交互,创建解构任务
  36. response = await self.tool.create_decode_task(article)
  37. response_code = response.get("code")
  38. if response_code != self.RequestDecode.SUCCESS:
  39. # 解构任务创建失败
  40. await self.mapper.update_article_decode_status(
  41. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  42. )
  43. await self.log_service.log(
  44. contents={
  45. "article_id": article_id,
  46. "task": "create_decode_task",
  47. "status": "fail",
  48. "data": response,
  49. }
  50. )
  51. return
  52. task_id = response.get("data", {}).get("task_id") or response.get(
  53. "data", {}
  54. ).get("taskId")
  55. if not task_id:
  56. # 解构任务创建失败
  57. await self.mapper.update_article_decode_status(
  58. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  59. )
  60. await self.log_service.log(
  61. contents={
  62. "article_id": article_id,
  63. "task": "create_decode_task",
  64. "status": "fail",
  65. "data": response,
  66. }
  67. )
  68. return
  69. # 创建 decode 任务成功
  70. await self.log_service.log(
  71. contents={
  72. "article_id": article_id,
  73. "task": "create_decode_task",
  74. "status": "success",
  75. "data": response,
  76. }
  77. )
  78. wx_sn = article["wx_sn"]
  79. remark = f"task_id: {task_id}-创建解构任务"
  80. record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
  81. if not record_row:
  82. # 记录解构任务失败
  83. await self.mapper.update_article_decode_status(
  84. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  85. )
  86. await self.log_service.log(
  87. contents={
  88. "article_id": article_id,
  89. "task": "record_decode_task",
  90. "status": "fail",
  91. "message": "创建 decode 记录失败",
  92. "data": response,
  93. }
  94. )
  95. return
  96. # 记录创建成功
  97. await self.mapper.update_article_decode_status(
  98. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  99. )
  100. async def create_tasks(self):
  101. article_list = await self.mapper.fetch_decode_articles()
  102. if not article_list:
  103. await self.log_service.log(
  104. contents={
  105. "task": "create_tasks",
  106. "message": "No more articles to decode",
  107. }
  108. )
  109. return
  110. for article in tqdm(article_list, desc="Creating decode tasks"):
  111. await self.create_single_decode_task(article)
  112. async def deal(self):
  113. await self.create_tasks()
  114. class CreateInnerArticlesDecodeTask(DecodeTaskConst):
  115. CREATE_TASK_NAME = "create_inner_articles_decode_task"
  116. MAX_CREATE_RETRY_TIMES = 3
  117. LOCK_TIMEOUT_SECONDS = 30 * 60
  118. CREATE_MAX_CONCURRENCY = 5
  119. def __init__(self, pool: DatabaseManager, log_service: LogService):
  120. self.pool = pool
  121. self.log_service = log_service
  122. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  123. self.tool = InnerArticlesDecodeUtils()
  124. async def _log_create_event(self, **contents):
  125. await self.log_service.log(
  126. contents={"task": self.CREATE_TASK_NAME, **contents}
  127. )
  128. @staticmethod
  129. def _trim_error_message(message: str, limit: int = 500):
  130. if not message:
  131. return ""
  132. return message[:limit]
  133. async def _mark_retry_or_failed(
  134. self,
  135. source_id: str,
  136. task_type: int,
  137. error_message: str,
  138. retryable: bool,
  139. state: Dict | None,
  140. ):
  141. now_ts = int(time.time())
  142. retry_count = (state or {}).get("retry_count", 0)
  143. should_retry = retryable and retry_count < self.MAX_CREATE_RETRY_TIMES
  144. error_message = self._trim_error_message(error_message)
  145. if should_retry:
  146. await self.mapper.mark_create_retry(
  147. source_id=source_id,
  148. task_type=task_type,
  149. now_ts=now_ts,
  150. error_message=error_message,
  151. )
  152. await self._log_create_event(
  153. source_id=source_id,
  154. task_type=task_type,
  155. status="retry",
  156. retry_count=retry_count + 1,
  157. message=error_message,
  158. )
  159. return
  160. await self.mapper.mark_create_failed(
  161. source_id=source_id,
  162. task_type=task_type,
  163. now_ts=now_ts,
  164. error_message=error_message,
  165. )
  166. await self._log_create_event(
  167. source_id=source_id,
  168. task_type=task_type,
  169. status="failed",
  170. retry_count=retry_count + 1,
  171. message=error_message,
  172. )
  173. async def _build_payload(self, article: Dict, task_type: int):
  174. source_id = article["source_id"]
  175. match task_type:
  176. case self.TaskType.PUBLISH_TITLE_COVER:
  177. return {
  178. "source_id": source_id,
  179. "title": article["title"],
  180. "cover_img": article["cover_img_url"],
  181. "channel_content_id": article.get("wx_sn", source_id),
  182. "content_type": self.ContentType.TITLE_COVER
  183. }
  184. case self.TaskType.SOURCE_IMAGES_TEXT:
  185. crawl_source_info = await self.mapper.fetch_article_crawler_source_info(source_id)
  186. if not crawl_source_info:
  187. raise ValueError("未找到文章抓取源信息")
  188. crawl_info = crawl_source_info[0]
  189. channel_content_id = crawl_info["channel_content_id"]
  190. raw_body_text = crawl_info["body_text"]
  191. body_text, images = self.tool.extract_body_text_and_images(raw_body_text)
  192. if not body_text and not images:
  193. raise ValueError("文章正文和图片均为空,无法创建解构任务")
  194. return {
  195. "source_id": source_id,
  196. "images": images,
  197. "body_text": body_text,
  198. "channel_content_id": channel_content_id or source_id,
  199. "content_type": self.ContentType.LONG_ARTICLE
  200. }
  201. case self.TaskType.MINI_TITLE_CARD:
  202. raise NotImplementedError("MINI_TITLE_CARD 数据未完善")
  203. case _:
  204. raise ValueError(f"unsupported task type: {task_type}")
  205. async def create_single_decode_task(self, task: Dict):
  206. article = task["article"]
  207. task_type = task["task_type"]
  208. source_id = article["source_id"]
  209. now_ts = int(time.time())
  210. lock_expire_before = now_ts - self.LOCK_TIMEOUT_SECONDS
  211. await self.mapper.init_create_state(source_id, task_type, now_ts)
  212. acquire_lock = await self.mapper.acquire_create_lock(
  213. source_id=source_id,
  214. task_type=task_type,
  215. now_ts=now_ts,
  216. max_retry_times=self.MAX_CREATE_RETRY_TIMES,
  217. lock_expire_before=lock_expire_before,
  218. )
  219. if not acquire_lock:
  220. await self._log_create_event(
  221. source_id=source_id,
  222. task_type=task_type,
  223. status="skip",
  224. message="acquire create lock failed",
  225. )
  226. return
  227. state = await self.mapper.fetch_create_state(source_id, task_type)
  228. try:
  229. # exist_task = await self.mapper.fetch_exist_source_id(source_id, task_type)
  230. # if exist_task:
  231. # await self.mapper.mark_create_success(
  232. # source_id=source_id,
  233. # task_type=task_type,
  234. # remote_task_id="existing_task",
  235. # now_ts=int(time.time()),
  236. # remark="任务已存在,跳过重复创建",
  237. # )
  238. # await self._log_create_event(
  239. # source_id=source_id,
  240. # task_type=task_type,
  241. # status="skip",
  242. # message="decode task already exists",
  243. # )
  244. # return
  245. payload = await self._build_payload(article, task_type)
  246. response = await self.tool.create_decode_task_with_retry(
  247. payload=payload,
  248. retry_times=self.MAX_CREATE_RETRY_TIMES,
  249. )
  250. response_code = response.get("code")
  251. if response_code != self.RequestDecode.SUCCESS:
  252. await self._mark_retry_or_failed(
  253. source_id=source_id,
  254. task_type=task_type,
  255. error_message=f"解构任务创建失败: {json.dumps(response, ensure_ascii=False)}",
  256. retryable=False,
  257. state=state,
  258. )
  259. return
  260. task_id = response.get("data", {}).get("task_id") or response.get(
  261. "data", {}
  262. ).get("taskId")
  263. if not task_id:
  264. await self._mark_retry_or_failed(
  265. source_id=source_id,
  266. task_type=task_type,
  267. error_message=f"解构任务返回缺少 task_id: {json.dumps(response, ensure_ascii=False)}",
  268. retryable=False,
  269. state=state,
  270. )
  271. return
  272. remark = f"task_id: {task_id}-创建解构任务"
  273. record_row = await self.mapper.record_decode_task_if_absent(
  274. task_id=task_id,
  275. content_id=source_id,
  276. task_type=task_type,
  277. payload=json.dumps(payload, ensure_ascii=False),
  278. remark=remark,
  279. )
  280. if record_row not in (0, 1):
  281. await self._mark_retry_or_failed(
  282. source_id=source_id,
  283. task_type=task_type,
  284. error_message="创建 decode 记录失败",
  285. retryable=True,
  286. state=state,
  287. )
  288. return
  289. await self.mapper.mark_create_success(
  290. source_id=source_id,
  291. task_type=task_type,
  292. remote_task_id=task_id,
  293. now_ts=int(time.time()),
  294. remark=remark,
  295. )
  296. await self._log_create_event(
  297. source_id=source_id,
  298. task_type=task_type,
  299. status="success",
  300. retry_count=(state or {}).get("retry_count", 0),
  301. data=response,
  302. )
  303. except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError) as exc:
  304. await self._mark_retry_or_failed(
  305. source_id=source_id,
  306. task_type=task_type,
  307. error_message=f"解构服务调用异常: {exc}",
  308. retryable=True,
  309. state=state,
  310. )
  311. except (ValueError, NotImplementedError) as exc:
  312. await self._mark_retry_or_failed(
  313. source_id=source_id,
  314. task_type=task_type,
  315. error_message=str(exc),
  316. retryable=False,
  317. state=state,
  318. )
  319. except Exception as exc:
  320. await self._mark_retry_or_failed(
  321. source_id=source_id,
  322. task_type=task_type,
  323. error_message=f"创建解构任务异常: {exc}",
  324. retryable=True,
  325. state=state,
  326. )
  327. async def create_tasks(self, date_string: str = None, max_concurrency: int = None):
  328. article_list = await self.mapper.fetch_inner_articles(date_string or "20260401")
  329. if not article_list:
  330. await self._log_create_event(
  331. status="empty",
  332. message="No more articles to decode",
  333. )
  334. return
  335. decode_types = [
  336. self.TaskType.SOURCE_IMAGES_TEXT,
  337. # self.TaskType.PUBLISH_TITLE_COVER
  338. ]
  339. task_list = [
  340. {"article": article, "task_type": task_type}
  341. for article in article_list
  342. for task_type in decode_types
  343. ]
  344. result = await run_tasks_with_asyncio_task_group(
  345. task_list=task_list,
  346. handler=self.create_single_decode_task,
  347. description="Creating inner decode tasks",
  348. unit="task",
  349. max_concurrency=max_concurrency or self.CREATE_MAX_CONCURRENCY,
  350. fail_fast=False,
  351. )
  352. if result["errors"]:
  353. await self._log_create_event(
  354. status="partial_error",
  355. message="some inner decode tasks raised uncaught errors",
  356. data={
  357. "total_task": result["total_task"],
  358. "processed_task": result["processed_task"],
  359. "error_count": len(result["errors"]),
  360. },
  361. )
  362. async def deal(self, date_string: str = None, max_concurrency: int = None):
  363. await self.create_tasks(
  364. date_string=date_string,
  365. max_concurrency=max_concurrency,
  366. )
  367. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]