""" @author: luojunhui """ import traceback from typing import Dict, List from aiomysql.cursors import DictCursor from applications.log import logging from applications.config import Config from applications.const import rematch_task_const from applications.etl_function import download_cover from applications.etl_function import download_video from applications.etl_function import generate_video_path from applications.etl_function import upload_to_oss from applications.spider import search_videos_from_web from .utils import get_kimi_result class ReMatchTask(object): """ 重新匹配任务 """ def __init__(self, db_client): self.db_client = db_client self.config = Config() self.article_match_video_table = self.config.article_match_video_table self.article_text_table = self.config.article_text_table self.article_crawler_video_table = self.config.article_crawler_video_table async def get_tasks(self) -> List[Dict]: """ get task needs to rematch """ select_sql = f""" SELECT DISTINCT content_id FROM article_re_match_record WHERE status = {rematch_task_const.REMATCH_INIT_STATUS}; """ result = await self.db_client.async_select(select_sql, cursor_type=DictCursor) logging( code="rematch_1001", function="get_tasks", info="获取content_id数量: {}".format(len(result)) ) return result async def async_download_video_list(self, to_download_videos: List[Dict], illegal_platform_id_list: List[str]) -> int: """ 异步下载视频 """ success_count = 0 for video in to_download_videos: # check whether video is illegal out_key = "{}_{}".format(video['platform'], video['out_video_id']) if out_key in illegal_platform_id_list: continue # start download try: local_video_path, local_cover_path = generate_video_path(video['platform'], video['out_video_id']) # download videos file_path = await download_video( file_path=local_video_path, platform=video['platform'], video_url=video['video_url'] ) if not file_path: # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态 update_sql = f""" UPDATE {self.article_crawler_video_table} SET download_status = %s WHERE id = %s; """ await self.db_client.async_insert( sql=update_sql, params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id']) ) else: # download cover cover_path = await download_cover( file_path=local_cover_path, platform=video['platform'], cover_url=video['cover_url'] ) # upload video to oss oss_video = await upload_to_oss( local_video_path=file_path, download_type="video" ) # upload cover to oss if cover_path: oss_cover = await upload_to_oss( local_video_path=cover_path, download_type="image" ) else: oss_cover = None # change status to success update_sql = f""" UPDATE {self.article_crawler_video_table} SET video_oss_path = %s, cover_oss_path = %s, download_status = %s WHERE id = %s; """ await self.db_client.async_insert( sql=update_sql, params=( oss_video, oss_cover, rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS, video['id'] ) ) success_count += 1 # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态 if success_count > rematch_task_const.MIN_MATCH_VIDEO_NUM: return success_count except Exception as e: update_sql = f""" UPDATE {self.article_crawler_video_table} SET download_status = %s WHERE id = %s; """ await self.db_client.async_insert( sql=update_sql, params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id']) ) return success_count async def download_upload_task(self, content_id: str, illegal_platform_id_list: List[str], legal_not_downloaded_videos=None) -> int: """ 下载任务 """ success_count = 0 if legal_not_downloaded_videos: # 下载legal_not_download_videos, 记录下载成功数量 new_download_count = await self.async_download_video_list( to_download_videos=legal_not_downloaded_videos, illegal_platform_id_list=illegal_platform_id_list ) success_count += new_download_count return success_count select_sql = f""" SELECT id, out_video_id, platform, video_title, video_url, cover_url FROM {self.article_crawler_video_table} WHERE content_id = {content_id} AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS}; """ to_download_videos = await self.db_client.async_select(select_sql, cursor_type=DictCursor) new_download_count = await self.async_download_video_list( to_download_videos=to_download_videos, illegal_platform_id_list=illegal_platform_id_list ) success_count += new_download_count return success_count async def spider_task(self, content_id: str) -> int: """ 爬虫抓取任务 """ kimi_result = await get_kimi_result( content_id=content_id, article_text_table=self.article_text_table, db_client=self.db_client ) # 该搜索任务是content_id粒度,因此将trace_id 传为content_id search_videos_count = await search_videos_from_web( info={ "ori_title": kimi_result['ori_title'], "kimi_summary": kimi_result['kimi_summary'], "kimi_keys": kimi_result['kimi_keys'], "trace_id": content_id, "gh_id": rematch_task_const.TASK_DEFAULT_GH_ID, "content_id": content_id, "crawler_video_table": self.article_crawler_video_table }, gh_id_map={}, db_client=self.db_client ) return search_videos_count async def update_each_content_id(self, content_id: str) -> None: """ 更新每一个content_id的视频信息 """ select_sql = f""" SELECT id, platform, out_video_id, video_oss_path, download_status, is_illegal, video_url, cover_url FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}'; """ record_list = await self.db_client.async_select(select_sql, cursor_type=DictCursor) # 获取违规视频,违规视频out_id && platform illegal_videos = [record for record in record_list if record["is_illegal"] == rematch_task_const.VIDEO_UNSAFE] illegal_platform_id_list = [ "{}_{}".format(record['platform'], record['out_video_id']) for record in illegal_videos ] # 处理非违规字段 legal_downloaded_videos = [] legal_not_downloaded_videos = [] for record in record_list: if record['is_illegal'] == rematch_task_const.VIDEO_UNSAFE: continue else: out_key = "{}_{}".format(record['platform'], record['out_video_id']) if out_key in illegal_platform_id_list: continue if record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS: legal_not_downloaded_videos.append(record) elif record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS: legal_downloaded_videos.append(record) else: continue if len(legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM: logging( code="rematch_1002", info="存在{}条以上待合规已下载内容,无需处理该content_id".format(rematch_task_const.MIN_MATCH_VIDEO_NUM), data={ "content_id": content_id } ) return elif len(legal_not_downloaded_videos + legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM: logging( code="rematch_1003", info="存在{}条以上待合规内容".format(rematch_task_const.MIN_MATCH_VIDEO_NUM), data={ "content_id": content_id, "待下载数量": len(legal_not_downloaded_videos), "已下载数量": len(legal_downloaded_videos) } ) await self.download_upload_task( content_id=content_id, illegal_platform_id_list=illegal_platform_id_list, legal_not_downloaded_videos=legal_not_downloaded_videos ) else: logging( code="rematch_1004", info="重新执行爬虫任务", data={ "content_id": content_id, } ) await self.spider_task(content_id=content_id) await self.download_upload_task( content_id=content_id, illegal_platform_id_list=illegal_platform_id_list ) async def check_whether_get_enough_videos(self, content_id: str) -> bool: """ check 该content_id是否存在足够的视频 """ select_sql = f""" SELECT count(1) FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS} AND is_illegal = {rematch_task_const.VIDEO_SAFE}; """ count_tuple = await self.db_client.async_select(select_sql) count = count_tuple[0][0] if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM: return True else: return False async def update_content_id_status(self, content_id: str) -> int: """ 更新content_id的status为成功 """ update_sql = f""" UPDATE article_re_match_record SET status = %s WHERE content_id = '{content_id}'; """ affected_rows = await self.db_client.async_insert( update_sql, params=(rematch_task_const.REMATCH_SUCCESS_STATUS,) ) return affected_rows async def deal(self): """ do job here """ task_list = await self.get_tasks() for task in task_list: content_id = task['content_id'] try: await self.update_each_content_id(content_id=content_id) if await self.check_whether_get_enough_videos(content_id=content_id): # 修改状态为1 await self.update_content_id_status(content_id=content_id) else: continue except Exception as e: error_stack = traceback.format_exc() logging( code="rematch_1010", function="update_each_content_id", data={ "error_stack": error_stack, "error": str(e), "content_id": content_id } )