spider_task.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. import json
  6. import time
  7. from applications.config import Config
  8. from applications.log import logging
  9. from applications.spider import search_videos_from_web
  10. class spiderTask(object):
  11. """
  12. 定时执行任务
  13. """
  14. def __init__(self, mysql_client):
  15. """
  16. :param mysql_client:
  17. """
  18. self.mysql_client = mysql_client
  19. self.config = Config()
  20. self.article_match_video_table = self.config.article_match_video_table
  21. self.article_text_table = self.config.article_text_table
  22. self.article_crawler_video_table = self.config.article_crawler_video_table
  23. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  24. self.gh_id_map = json.loads(self.config.get_config_value("accountMap"))
  25. async def get_task(self):
  26. """
  27. 获取任务, 查询出 article_match_video_table 中 已经 kimi 执行完成的 content_id
  28. :return:
  29. """
  30. select_sql = f"""
  31. SELECT
  32. amvt.trace_id,
  33. amvt.content_id,
  34. amvt.gh_id,
  35. amvt.process_times
  36. FROM {self.article_match_video_table} amvt
  37. JOIN (
  38. select content_id
  39. from {self.article_text_table}
  40. where kimi_status != 0
  41. ) att on amvt.content_id = att.content_id
  42. WHERE content_status = 0 and process_times <= 3
  43. GROUP BY content_id
  44. LIMIT {self.spider_coroutines};
  45. """
  46. content_id_tuple = await self.mysql_client.async_select(select_sql)
  47. if content_id_tuple:
  48. content_id_list = [i for i in list(content_id_tuple)]
  49. task_obj_list = [
  50. {
  51. "trace_id": item[0],
  52. "content_id": item[1],
  53. "gh_id": item[2],
  54. "process_times": item[3]
  55. } for item in content_id_list
  56. ]
  57. logging(
  58. code="9001",
  59. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  60. data=task_obj_list
  61. )
  62. return task_obj_list
  63. else:
  64. return []
  65. async def get_history_videos(self, content_id):
  66. """
  67. check whether the contents videos exists
  68. :param content_id:
  69. :return:
  70. """
  71. select_sql = f"""
  72. SELECT count(1)
  73. FROM {self.article_crawler_video_table}
  74. where content_id = '{content_id}' and download_status = 2;
  75. """
  76. content_videos = await self.mysql_client.async_select(select_sql)
  77. videos_count = content_videos[0][0]
  78. if videos_count >= 3:
  79. return True
  80. else:
  81. return False
  82. async def judge_content_processing(self, content_id):
  83. """
  84. 判断该 content_id 是否在处理中
  85. :param content_id:
  86. :return:
  87. """
  88. select_sql = f"""
  89. SELECT trace_id, content_status
  90. FROM {self.article_match_video_table}
  91. WHERE content_id = '{content_id}'
  92. ORDER BY id DESC;
  93. """
  94. result = await self.mysql_client.async_select(select_sql)
  95. if result:
  96. for item in result:
  97. trace_id, content_status = item
  98. if content_status != 0:
  99. return False
  100. return True
  101. else:
  102. return True
  103. async def get_kimi_result(self, content_id):
  104. """
  105. 通过 content_id 获取kimi info
  106. :return:
  107. """
  108. select_sql = f"""
  109. select article_title, kimi_title, kimi_summary, kimi_keys, kimi_status
  110. from {self.article_text_table}
  111. where content_id = '{content_id}';
  112. """
  113. response = await self.mysql_client.async_select(select_sql)
  114. if response:
  115. article_detail = response[0]
  116. if article_detail[4] == 1:
  117. result = {
  118. "ori_title": article_detail[0],
  119. "kimi_title": article_detail[1],
  120. "kimi_summary": article_detail[2],
  121. "kimi_keys": json.loads(article_detail[3]),
  122. "kimi_status": article_detail[4]
  123. }
  124. else:
  125. result = {
  126. "kimiStatus": article_detail[4]
  127. }
  128. return result
  129. else:
  130. return
  131. async def start_process(self, params):
  132. """
  133. 开始处理
  134. :param params:
  135. :return:
  136. """
  137. defeat_status = 99
  138. finish_kimi_status = 1
  139. finish_spider_status = 2
  140. kimi_result = await self.get_kimi_result(content_id=params['content_id'])
  141. kimi_status = kimi_result['kimi_status']
  142. match kimi_status:
  143. case 1:
  144. update_process_times_sql = f"""
  145. UPDATE {self.article_match_video_table}
  146. SET process_times = %s, content_status = %s, content_status_update_time = %s
  147. WHERE trace_id = %s;
  148. """
  149. await self.mysql_client.async_insert(
  150. sql=update_process_times_sql,
  151. params=(
  152. params['process_times'] + 1,
  153. finish_kimi_status,
  154. int(time.time()),
  155. params['trace_id']
  156. )
  157. )
  158. try:
  159. search_videos_count = await search_videos_from_web(
  160. info={
  161. "ori_title": kimi_result['ori_title'],
  162. "kimi_summary": kimi_result['kimi_summary'],
  163. "kimi_keys": kimi_result['kimi_keys'],
  164. "trace_id": params['trace_id'],
  165. "gh_id": params['gh_id'],
  166. "content_id": params['content_id'],
  167. "crawler_video_table": self.article_crawler_video_table
  168. },
  169. gh_id_map=self.gh_id_map,
  170. db_client=self.mysql_client
  171. )
  172. if search_videos_count > 3:
  173. update_process_times_sql = f"""
  174. UPDATE {self.article_match_video_table}
  175. SET process_times = %s, content_status = %s, content_status_update_time = %s
  176. WHERE trace_id = %s;
  177. """
  178. await self.mysql_client.async_insert(
  179. sql=update_process_times_sql,
  180. params=(
  181. params['process_times'] + 1,
  182. finish_spider_status,
  183. int(time.time()),
  184. params['trace_id']
  185. )
  186. )
  187. else:
  188. roll_back_status = f"""
  189. UPDATE {self.article_match_video_table}
  190. SET process_times = %s, content_status_update_time = %s
  191. WHERE trace_id = %s;
  192. """
  193. await self.mysql_client.async_insert(
  194. sql=roll_back_status,
  195. params=(
  196. params['process_times'] + 1,
  197. int(time.time()),
  198. params['trace_id']
  199. )
  200. )
  201. except Exception as e:
  202. roll_back_status = f"""
  203. UPDATE {self.article_match_video_table}
  204. SET process_times = %s, content_status_update_time = %s
  205. WHERE trace_id = %s;
  206. """
  207. await self.mysql_client.async_insert(
  208. sql=roll_back_status,
  209. params=(
  210. params['process_times'] + 1,
  211. int(time.time()),
  212. params['trace_id']
  213. )
  214. )
  215. print("爬虫处理失败: {}".format(e))
  216. case 2:
  217. update_process_times_sql = f"""
  218. UPDATE {self.article_match_video_table}
  219. SET process_times = %s, content_status = %s, content_status_update_time = %s
  220. WHERE trace_id = %s;
  221. """
  222. await self.mysql_client.async_insert(
  223. sql=update_process_times_sql,
  224. params=(
  225. params['process_times'] + 1,
  226. defeat_status,
  227. int(time.time()),
  228. params['trace_id'],
  229. )
  230. )
  231. async def process_task(self, params):
  232. """
  233. 异步执行
  234. :param params:
  235. :return:
  236. """
  237. content_id = params['content_id']
  238. trace_id = params['trace_id']
  239. video_id_list = await self.get_history_videos(content_id=content_id)
  240. if video_id_list:
  241. # 说明已经存在了结果, 将该条记录下的video_id拿出来
  242. logging(
  243. code="9001",
  244. info="存在历史文章",
  245. trace_id=trace_id
  246. )
  247. else:
  248. flag = await self.judge_content_processing(content_id)
  249. if flag:
  250. logging(
  251. code="9004",
  252. info="无正在处理的文章ID, 开始处理",
  253. trace_id=trace_id
  254. )
  255. await self.start_process(params=params)
  256. else:
  257. logging(
  258. code="9003",
  259. info="该文章ID正在请求--文章ID {}".format(content_id),
  260. trace_id=trace_id
  261. )
  262. async def deal(self):
  263. """
  264. 处理
  265. :return:
  266. """
  267. task_list = await self.get_task()
  268. logging(
  269. code="5005",
  270. info="Spider Task Got {} this time".format(len(task_list)),
  271. function="Spider Task"
  272. )
  273. if task_list:
  274. tasks = [self.process_task(params) for params in task_list]
  275. await asyncio.gather(*tasks)
  276. else:
  277. print("没有新的爬虫请求")