""" @author: luojunhui """ import asyncio import json import time from applications.config import Config from applications.log import logging from applications.spider import search_videos_from_web class spiderTask(object): """ 定时执行任务 """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client self.config = Config() self.article_match_video_table = self.config.article_match_video_table self.article_text_table = self.config.article_text_table self.article_crawler_video_table = self.config.article_crawler_video_table self.spider_coroutines = self.config.get_config_value("spiderCoroutines") self.gh_id_map = json.loads(self.config.get_config_value("accountMap")) async def get_task(self): """ 获取任务, 查询出 article_match_video_table 中 已经 kimi 执行完成的 content_id :return: """ select_sql = f""" SELECT amvt.trace_id, amvt.content_id, amvt.gh_id, amvt.process_times FROM {self.article_match_video_table} amvt JOIN ( select content_id from {self.article_text_table} where kimi_status != 0 ) att on amvt.content_id = att.content_id WHERE content_status = 0 and process_times <= 3 GROUP BY content_id LIMIT {self.spider_coroutines}; """ content_id_tuple = await self.mysql_client.async_select(select_sql) if content_id_tuple: content_id_list = [i for i in list(content_id_tuple)] task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "gh_id": item[2], "process_times": item[3] } for item in content_id_list ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list else: return [] async def get_history_videos(self, content_id): """ check whether the contents videos exists :param content_id: :return: """ select_sql = f""" SELECT count(1) FROM {self.article_crawler_video_table} where content_id = '{content_id}' and download_status = 2; """ content_videos = await self.mysql_client.async_select(select_sql) videos_count = content_videos[0][0] if videos_count >= 3: return True else: return False async def judge_content_processing(self, content_id): """ 判断该 content_id 是否在处理中 :param content_id: :return: """ select_sql = f""" SELECT trace_id, content_status FROM {self.article_match_video_table} WHERE content_id = '{content_id}' ORDER BY id DESC; """ result = await self.mysql_client.async_select(select_sql) if result: for item in result: trace_id, content_status = item if content_status != 0: return False return True else: return True async def get_kimi_result(self, content_id): """ 通过 content_id 获取kimi info :return: """ select_sql = f""" select article_title, kimi_title, kimi_summary, kimi_keys, kimi_status from {self.article_text_table} where content_id = '{content_id}'; """ response = await self.mysql_client.async_select(select_sql) if response: article_detail = response[0] if article_detail[4] == 1: result = { "ori_title": article_detail[0], "kimi_title": article_detail[1], "kimi_summary": article_detail[2], "kimi_keys": json.loads(article_detail[3]), "kimi_status": article_detail[4] } else: result = { "kimiStatus": article_detail[4] } return result else: return async def start_process(self, params): """ 开始处理 :param params: :return: """ defeat_status = 99 finish_kimi_status = 1 finish_spider_status = 2 kimi_result = await self.get_kimi_result(content_id=params['content_id']) kimi_status = kimi_result['kimi_status'] match kimi_status: case 1: update_process_times_sql = f""" UPDATE {self.article_match_video_table} SET process_times = %s, content_status = %s, content_status_update_time = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_process_times_sql, params=( params['process_times'] + 1, finish_kimi_status, int(time.time()), params['trace_id'] ) ) try: search_videos_count = await search_videos_from_web( info={ "ori_title": kimi_result['ori_title'], "kimi_summary": kimi_result['kimi_summary'], "kimi_keys": kimi_result['kimi_keys'], "trace_id": params['trace_id'], "gh_id": params['gh_id'], "content_id": params['content_id'], "crawler_video_table": self.article_crawler_video_table }, gh_id_map=self.gh_id_map, db_client=self.mysql_client ) if search_videos_count > 3: update_process_times_sql = f""" UPDATE {self.article_match_video_table} SET process_times = %s, content_status = %s, content_status_update_time = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_process_times_sql, params=( params['process_times'] + 1, finish_spider_status, int(time.time()), params['trace_id'] ) ) else: roll_back_status = f""" UPDATE {self.article_match_video_table} SET process_times = %s, content_status_update_time = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=roll_back_status, params=( params['process_times'] + 1, int(time.time()), params['trace_id'] ) ) except Exception as e: roll_back_status = f""" UPDATE {self.article_match_video_table} SET process_times = %s, content_status_update_time = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=roll_back_status, params=( params['process_times'] + 1, int(time.time()), params['trace_id'] ) ) print("爬虫处理失败: {}".format(e)) case 2: update_process_times_sql = f""" UPDATE {self.article_match_video_table} SET process_times = %s, content_status = %s, content_status_update_time = %s WHERE trace_id = %s; """ await self.mysql_client.async_insert( sql=update_process_times_sql, params=( params['process_times'] + 1, defeat_status, int(time.time()), params['trace_id'], ) ) async def process_task(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] video_id_list = await self.get_history_videos(content_id=content_id) if video_id_list: # 说明已经存在了结果, 将该条记录下的video_id拿出来 logging( code="9001", info="存在历史文章", trace_id=trace_id ) else: flag = await self.judge_content_processing(content_id) if flag: logging( code="9004", info="无正在处理的文章ID, 开始处理", trace_id=trace_id ) await self.start_process(params=params) else: logging( code="9003", info="该文章ID正在请求--文章ID {}".format(content_id), trace_id=trace_id ) async def deal(self): """ 处理 :return: """ task_list = await self.get_task() logging( code="5005", info="Spider Task Got {} this time".format(len(task_list)), function="Spider Task" ) if task_list: tasks = [self.process_task(params) for params in task_list] await asyncio.gather(*tasks) else: print("没有新的爬虫请求")