create_decode_tasks.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. from typing import Dict
  2. from tqdm import tqdm
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from ._const import DecodeTaskConst
  6. from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper
  7. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  8. class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
  9. def __init__(self, pool: DatabaseManager, log_service: LogService):
  10. self.pool = pool
  11. self.log_service = log_service
  12. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  13. self.tool = AdPlatformArticlesDecodeUtils()
  14. async def create_single_decode_task(self, article: Dict):
  15. # Acquire Lock
  16. article_id = article["id"]
  17. acquire_lock = await self.mapper.update_article_decode_status(
  18. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  19. )
  20. if not acquire_lock:
  21. await self.log_service.log(
  22. contents={
  23. "article_id": article_id,
  24. "task": "create_decode_task",
  25. "status": "skip",
  26. "message": "acquire lock failed",
  27. }
  28. )
  29. return
  30. # 与解构系统交互,创建解构任务
  31. response = await self.tool.create_decode_task(article)
  32. response_code = response.get("code")
  33. if response_code != self.RequestDecode.SUCCESS:
  34. # 解构任务创建失败
  35. await self.mapper.update_article_decode_status(
  36. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  37. )
  38. await self.log_service.log(
  39. contents={
  40. "article_id": article_id,
  41. "task": "create_decode_task",
  42. "status": "fail",
  43. "data": response,
  44. }
  45. )
  46. return
  47. task_id = response.get("data", {}).get("task_id") or response.get(
  48. "data", {}
  49. ).get("taskId")
  50. if not task_id:
  51. # 解构任务创建失败
  52. await self.mapper.update_article_decode_status(
  53. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  54. )
  55. await self.log_service.log(
  56. contents={
  57. "article_id": article_id,
  58. "task": "create_decode_task",
  59. "status": "fail",
  60. "data": response,
  61. }
  62. )
  63. return
  64. # 创建 decode 任务成功
  65. await self.log_service.log(
  66. contents={
  67. "article_id": article_id,
  68. "task": "create_decode_task",
  69. "status": "success",
  70. "data": response,
  71. }
  72. )
  73. wx_sn = article["wx_sn"]
  74. remark = f"task_id: {task_id}-创建解构任务"
  75. record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
  76. if not record_row:
  77. # 记录解构任务失败
  78. await self.mapper.update_article_decode_status(
  79. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  80. )
  81. await self.log_service.log(
  82. contents={
  83. "article_id": article_id,
  84. "task": "record_decode_task",
  85. "status": "fail",
  86. "message": "创建 decode 记录失败",
  87. "data": response,
  88. }
  89. )
  90. return
  91. # 记录创建成功
  92. await self.mapper.update_article_decode_status(
  93. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  94. )
  95. async def create_tasks(self):
  96. article_list = await self.mapper.fetch_decode_articles()
  97. if not article_list:
  98. await self.log_service.log(
  99. contents={
  100. "task": "create_tasks",
  101. "message": "No more articles to decode",
  102. }
  103. )
  104. return
  105. for article in tqdm(article_list, desc="Creating decode tasks"):
  106. await self.create_single_decode_task(article)
  107. async def deal(self):
  108. await self.create_tasks()
  109. class CreateInnerArticlesDecodeTask(DecodeTaskConst):
  110. def __init__(self, pool: DatabaseManager, log_service: LogService):
  111. self.pool = pool
  112. self.log_service = log_service
  113. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  114. self.tool = InnerArticlesDecodeUtils()
  115. async def create_single_decode_task(self, article: Dict):
  116. # Acquire Lock
  117. source_id = article["source_id"]
  118. article_produce_info = await self.mapper.fetch_inner_articles_produce_detail(
  119. source_id
  120. )
  121. # 与解构系统交互,创建解构任务
  122. response = await self.tool.create_decode_task(article, article_produce_info)
  123. response_code = response.get("code")
  124. if response_code != self.RequestDecode.SUCCESS:
  125. return
  126. task_id = response.get("data", {}).get("task_id") or response.get(
  127. "data", {}
  128. ).get("taskId")
  129. if not task_id:
  130. return
  131. wx_sn = article["wx_sn"]
  132. remark = f"task_id: {task_id}-创建解构任务"
  133. record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
  134. if not record_row:
  135. return
  136. async def create_tasks(self):
  137. article_list = await self.mapper.fetch_inner_articles()
  138. if not article_list:
  139. await self.log_service.log(
  140. contents={
  141. "task": "create_tasks",
  142. "message": "No more articles to decode",
  143. }
  144. )
  145. return
  146. for article in tqdm(article_list, desc="Creating decode tasks"):
  147. await self.create_single_decode_task(article)
  148. async def deal(self):
  149. await self.create_tasks()
  150. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]