spider_task.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. import json
  6. from applications.config import Config
  7. from applications.log import logging
  8. from applications.spider import searchVideos
  9. class spiderTask(object):
  10. """
  11. 定时执行任务
  12. """
  13. C = Config()
  14. def __init__(self, mysql_client):
  15. """
  16. :param mysql_client:
  17. """
  18. self.mysql_client = mysql_client
  19. self.article_video = self.C.articleVideos
  20. self.article_text = self.C.articleText
  21. self.article_video_crawler = self.C.articleCrawlerVideos
  22. self.spider_coroutines = self.C.getConfigValue("spiderCoroutines")
  23. self.gh_id_map = json.loads(self.C.getConfigValue("accountMap"))
  24. async def getTask(self):
  25. """
  26. 获取任务
  27. :return:
  28. """
  29. select_sql = f"""
  30. SELECT trace_id, content_id, gh_id, process_times
  31. FROM {self.article_video}
  32. WHERE content_status = 0 and process_times <= 3
  33. GROUP BY content_id
  34. LIMIT {self.spider_coroutines};
  35. """
  36. content_id_tuple = await self.mysql_client.asyncSelect(select_sql)
  37. if content_id_tuple:
  38. content_id_list = [i for i in list(content_id_tuple)]
  39. task_obj_list = [
  40. {
  41. "trace_id": item[0],
  42. "content_id": item[1],
  43. "gh_id": item[2],
  44. "process_times": item[3]
  45. } for item in content_id_list
  46. ]
  47. logging(
  48. code="9001",
  49. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  50. data=task_obj_list
  51. )
  52. return task_obj_list
  53. else:
  54. return []
  55. async def getHistoryVideos(self, content_id):
  56. """
  57. check whether the contents videos exists
  58. :param content_id:
  59. :return:
  60. """
  61. select_sql = f"""
  62. SELECT count(1)
  63. FROM {self.article_video_crawler}
  64. where content_id = '{content_id}' and download_status = 2;
  65. """
  66. content_videos = await self.mysql_client.asyncSelect(select_sql)
  67. videos_count = content_videos[0][0]
  68. if videos_count >= 3:
  69. return True
  70. else:
  71. return False
  72. async def judgeContentProcessing(self, content_id):
  73. """
  74. 判断该content_id是否在处理中
  75. :param content_id:
  76. :return:
  77. """
  78. select_sql = f"""
  79. SELECT trace_id, content_status
  80. FROM {self.article_video}
  81. WHERE content_id = '{content_id}'
  82. ORDER BY id DESC;
  83. """
  84. result = await self.mysql_client.asyncSelect(select_sql)
  85. if result:
  86. for item in result:
  87. trace_id, content_status = item
  88. if content_status == 1:
  89. return False
  90. return True
  91. else:
  92. return True
  93. async def getKimiResult(self, content_id):
  94. """
  95. 通过 content_id 获取kimi info
  96. :return:
  97. """
  98. select_sql = f"""
  99. select article_title, kimi_title, kimi_summary, kimi_keys, kimi_status
  100. from {self.article_text}
  101. where content_id = '{content_id}';
  102. """
  103. response = await self.mysql_client.asyncSelect(select_sql)
  104. if response:
  105. article_detail = response[0]
  106. if article_detail[4] == 1:
  107. result = {
  108. "oriTitle": article_detail[0],
  109. "kimiTitle": article_detail[1],
  110. "kimiSummary": article_detail[2],
  111. "kimiKeys": json.loads(article_detail[3]),
  112. "kimiStatus": article_detail[4]
  113. }
  114. else:
  115. result = {
  116. "kimiStatus": article_detail[4]
  117. }
  118. return result
  119. else:
  120. return
  121. async def startProcess(self, params):
  122. """
  123. 开始处理
  124. :param params:
  125. :return:
  126. """
  127. # 更新文章contentId为1, 说明该文章正在处理中
  128. kimi_result = await self.getKimiResult(content_id=params['content_id'])
  129. kimi_status = kimi_result['kimiStatus']
  130. match kimi_status:
  131. case 1:
  132. update_process_times_sql = f"""
  133. UPDATE {self.article_video}
  134. SET process_times = %s, content_status = %s
  135. WHERE trace_id = %s;
  136. """
  137. await self.mysql_client.asyncInsert(
  138. sql=update_process_times_sql,
  139. params=(
  140. params['process_times'] + 1,
  141. 1,
  142. params['trace_id']
  143. )
  144. )
  145. try:
  146. await searchVideos(
  147. info={
  148. "oriTitle": kimi_result['oriTitle'],
  149. "kimiSummary": kimi_result['kimiSummary'],
  150. "kimiKeys": kimi_result['kimiKeys'],
  151. "traceId": params['trace_id'],
  152. "ghId": params['gh_id'],
  153. "contentId": params['content_id'],
  154. "spider": self.article_video_crawler
  155. },
  156. ghIdMap=self.gh_id_map,
  157. dbClient=self.mysql_client
  158. )
  159. except Exception as e:
  160. roll_back_status = f"""
  161. UPDATE {self.article_video}
  162. SET content_status = %s
  163. WHERE trace_id = %s;
  164. """
  165. await self.mysql_client.asyncInsert(
  166. sql=roll_back_status,
  167. params=(
  168. 0,
  169. params['trace_id']
  170. )
  171. )
  172. print("处理失败,回退状态为 0")
  173. case 2:
  174. update_process_times_sql = f"""
  175. UPDATE {self.article_video}
  176. SET process_times = %s, content_status = %s
  177. WHERE trace_id = %s;
  178. """
  179. await self.mysql_client.asyncInsert(
  180. sql=update_process_times_sql,
  181. params=(
  182. params['process_times'] + 1,
  183. 3,
  184. params['trace_id']
  185. )
  186. )
  187. case 0:
  188. print("kimi not ready")
  189. async def processTask(self, params):
  190. """
  191. 异步执行
  192. :param params:
  193. :return:
  194. """
  195. content_id = params['content_id']
  196. trace_id = params['trace_id']
  197. video_id_list = await self.getHistoryVideos(content_id=content_id)
  198. if video_id_list:
  199. # 说明已经存在了结果, 将该条记录下的video_id拿出来
  200. logging(
  201. code="9001",
  202. info="存在历史文章",
  203. trace_id=trace_id
  204. )
  205. else:
  206. flag = await self.judgeContentProcessing(content_id)
  207. if flag:
  208. logging(
  209. code="9004",
  210. info="无正在处理的文章ID, 开始处理",
  211. trace_id=trace_id
  212. )
  213. await self.startProcess(params=params)
  214. else:
  215. logging(
  216. code="9003",
  217. info="该文章ID正在请求--文章ID {}".format(content_id),
  218. trace_id=trace_id
  219. )
  220. async def deal(self):
  221. """
  222. 处理
  223. :return:
  224. """
  225. task_list = await self.getTask()
  226. logging(
  227. code="5005",
  228. info="Spider Task Got {} this time".format(len(task_list)),
  229. function="Spider Task"
  230. )
  231. if task_list:
  232. tasks = [self.processTask(params) for params in task_list]
  233. await asyncio.gather(*tasks)
  234. else:
  235. print("没有新的爬虫请求")