""" @author: luojunhui """ import time import asyncio import datetime from applications.db import AsyncMySQLClient from applications.spider import search_videos_from_web from tasks.utils.kimi_task import get_kimi_result from tasks.utils.etl_task import async_download_videos class ContentRematch: """ 内容重新匹配 """ def __init__(self, db_client): self.db_client = db_client async def get_content_list(self): """ 获取待匹配的content_id列表 """ select_sql = f""" SELECT content_id, count(1) FROM long_articles_crawler_videos WHERE download_status = 2 GROUP BY content_id HAVING count(1) < 3 LIMIT 10; """ content_list = await self.db_client.async_select(select_sql) return content_list async def get_illegal_out_ids(self, content_id: str) -> list: """ 获取违规的外站视频id """ select_sql = f""" SELECT platform, out_video_id FROM long_articles_crawler_videos WHERE content_id = '{content_id}' and is_illegal = 1; """ response = await self.db_client.async_select(select_sql) if response: result = ["{}_{}".format(line[0], line[1]) for line in response] return result else: return [] async def spider_task(self, params, kimi_result): """ 爬虫任务 :return: """ content_id = params['content_id'] gh_id = "test_gh_id" trace_id = "rematch_{}".format(content_id) try: search_videos_count = await search_videos_from_web( info={ "ori_title": kimi_result['ori_title'], "kimi_summary": kimi_result['kimi_summary'], "kimi_keys": kimi_result['kimi_keys'], "trace_id": trace_id, "gh_id": gh_id, "content_id": content_id, "crawler_video_table": "long_articles_crawler_videos", }, gh_id_map={}, db_client=self.db_client ) if search_videos_count >= 3: return True else: return False except Exception as e: return False async def etl_task(self, content_id, trace_id): """ etl任务 """ # download videos illegal_videos = await self.get_illegal_out_ids(content_id) downloaded_count = await async_download_videos( trace_id=trace_id, content_id=content_id, article_crawler_video_table="long_articles_crawler_videos", db_client=self.db_client, illegal_videos=illegal_videos ) if downloaded_count >= 3: return True else: return False async def process_each_content(self, content): """ 处理每个content_id """ content_id = content[0] kimi_result = await get_kimi_result( content_id=content_id, db_client=self.db_client, article_text_table="long_articles_text", ) if not kimi_result: return spider_flag = await self.spider_task( params={ "content_id": content_id, }, kimi_result=kimi_result ) if not spider_flag: return etl_flag = await self.etl_task( content_id=content_id, trace_id="rematch_{}".format(content_id) ) if not etl_flag: return return True async def deal(self): """ 处理每个content_id """ content_list = await self.get_content_list() for content in content_list: print(content) await self.process_each_content(content) async def main_job(): """ main job :return: """ async with AsyncMySQLClient() as long_articles_pool: _task = ContentRematch(long_articles_pool) await _task.deal() if __name__ == '__main__': while True: asyncio.run(main_job()) now_str = datetime.datetime.now().__str__() print("{} 请求执行完成, 等待60s".format(now_str)) time.sleep(60)