""" @author: luojunhui """ import asyncio import json from applications.config import Config from applications.functions.log import logging from applications.spider import searchVideos class spiderTask(object): """ 定时执行任务 """ C = Config() def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client self.article_video = self.C.articleVideos self.article_text = self.C.articleText self.article_video_crawler = self.C.articleCrawlerVideos self.spider_coroutines = self.C.getConfigValue("spiderCoroutines") self.gh_id_map = json.loads(self.C.getConfigValue("accountMap")) async def getTask(self): """ 获取任务 :return: """ select_sql = f""" SELECT trace_id, content_id, gh_id, process_times FROM {self.article_video} WHERE content_status = 0 and process_times <= 3 GROUP BY content_id LIMIT {self.spider_coroutines}; """ content_id_tuple = await self.mysql_client.asyncSelect(select_sql) if content_id_tuple: content_id_list = [i for i in list(content_id_tuple)] task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "process_times": item[3] } for item in content_id_list ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list else: return [] async def getHistoryVideos(self, content_id): """ check whether the contents videos exists :param content_id: :return: """ select_sql = f""" SELECT count(1) FROM {self.article_video_crawler} where content_id = '{content_id}' and download_status = 2; """ content_videos = await self.mysql_client.asyncSelect(select_sql) videos_count = content_videos[0][0] if videos_count >= 3: return True else: return False async def judgeContentProcessing(self, content_id): """ 判断该content_id是否在处理中 :param content_id: :return: """ select_sql = f""" SELECT trace_id, content_status FROM {self.article_video} WHERE content_id = '{content_id}' ORDER BY id DESC; """ result = await self.mysql_client.asyncSelect(select_sql) if result: for item in result: trace_id, content_status = item if content_status == 1: return False return True else: return True async def getKimiResult(self, content_id): """ 通过 content_id 获取kimi info :return: """ select_sql = f""" select article_title, kimi_title, kimi_summary, kimi_keys, kimi_status from {self.article_text} where content_id = '{content_id}'; """ response = await self.mysql_client.asyncSelect(select_sql) if response: article_detail = response[0] if article_detail[4] == 1: result = { "oriTitle": article_detail[0], "kimiTitle": article_detail[1], "kimiSummary": article_detail[2], "kimiKeys": json.loads(article_detail[3]), "kimiStatus": article_detail[4] } else: result = { "kimiStatus": article_detail[4] } return result else: return async def startProcess(self, params): """ 开始处理 :param params: :return: """ # 更新文章contentId为1, 说明该文章正在处理中 kimi_result = await self.getKimiResult(content_id=params['content_id']) kimi_status = kimi_result['kimiStatus'] match kimi_status: case 1: update_process_times_sql = f""" UPDATE {self.article_video} SET process_times = %s, content_status = %s WHERE trace_id = %s; """ await self.mysql_client.asyncInsert( sql=update_process_times_sql, params=( params['process_times'] + 1, 1, params['trace_id'] ) ) try: await searchVideos( info={ "oriTitle": kimi_result['oriTitle'], "kimiSummary": kimi_result['kimiSummary'], "kimiKeys": kimi_result['kimiKeys'], "traceId": params['trace_id'], "ghId": params['gh_id'], "contentId": params['content_id'], "spider": self.article_video_crawler }, ghIdMap=self.gh_id_map, dbClient=self.mysql_client ) except Exception as e: roll_back_status = f""" UPDATE {self.article_video} SET content_status = %s WHERE trace_id = %s; """ await self.mysql_client.asyncInsert( sql=roll_back_status, params=( 0, params['trace_id'] ) ) print("处理失败,回退状态为 0") case 2: update_process_times_sql = f""" UPDATE {self.article_video} SET process_times = %s, content_status = %s WHERE trace_id = %s; """ await self.mysql_client.asyncInsert( sql=update_process_times_sql, params=( params['process_times'] + 1, 3, params['trace_id'] ) ) case 0: print("kimi not ready") async def processTask(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] video_id_list = await self.getHistoryVideos(content_id=content_id) if video_id_list: # 说明已经存在了结果, 将该条记录下的video_id拿出来 logging( code="9001", info="存在历史文章", trace_id=trace_id ) else: flag = await self.judgeContentProcessing(content_id) if flag: logging( code="9004", info="无正在处理的文章ID, 开始处理", trace_id=trace_id ) await self.startProcess(params=params) else: logging( code="9003", info="该文章ID正在请求--文章ID {}".format(content_id), trace_id=trace_id ) async def deal(self): """ 处理 :return: """ task_list = await self.getTask() if task_list: tasks = [self.processTask(params) for params in task_list] await asyncio.gather(*tasks) else: logging( code="9008", info="爬虫池没有要处理的请求" )