""" @author: luojunhui """ import json import time import asyncio from applications.config import Config from applications.log import logging from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail from applications.functions.common import shuffle_list from applications.functions.kimi import KimiServer from applications.spider import search_videos_from_web from applications.etl_function import * class NewContentIdTask(object): """ 不存在历史已经发布的文章的匹配流程 """ TASK_INIT_STATUS = 0 TASK_KIMI_FINISHED_STATUS = 1 TASK_SPIDER_FINISHED_STATUS = 2 TASK_ETL_FINISHED_STATUS = 3 TASK_PUBLISHED_STATUS = 4 TASK_PROCESSING_STATUS = 101 TASK_FAIL_STATUS = 99 ARTICLE_TEXT_TABLE_ERROR = 98 TASK_MAX_PROCESS_TIMES = 3 def __init__(self, mysql_client): self.mysql_client = mysql_client self.config = Config() self.article_match_video_table = self.config.article_match_video_table self.article_text_table = self.config.article_text_table self.article_crawler_video_table = self.config.article_crawler_video_table self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2")) self.account_map = json.loads(self.config.get_config_value("accountMap")) self.spider_coroutines = self.config.get_config_value("spiderCoroutines") async def get_tasks(self): """ 获取 task :return: """ # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1 select_processing_sql = f""" SELECT trace_id, content_status_update_time, process_times FROM {self.article_match_video_table} WHERE content_status = {self.TASK_PROCESSING_STATUS} and process_times <= {self.TASK_MAX_PROCESS_TIMES}; """ processing_articles = await self.mysql_client.async_select(select_processing_sql) if processing_articles: processing_list = [ { "trace_id": item[0], "content_status_update_time": item[1], "process_times": item[2] } for item in processing_articles ] for obj in processing_list: if int(time.time()) - obj['content_status_update_time'] >= 3600: # 认为该任务失败 await self.roll_back_content_status_when_fails( process_times=obj['process_times'] + 1, trace_id=obj['trace_id'] ) # 将 process_times > 3 且状态不为 4 的任务的状态修改为失败, update_status_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s WHERE process_times > %s and content_status != %s; """ await self.mysql_client.async_insert( update_status_sql, params=( self.TASK_FAIL_STATUS, self.TASK_MAX_PROCESS_TIMES, self.TASK_PUBLISHED_STATUS ) ) # 获取 process_times <= 3 且 content_status = 0 的任务 select_sql = f""" SELECT trace_id, content_id, flow_pool_level, gh_id, process_times FROM {self.article_match_video_table} WHERE content_status = {self.TASK_INIT_STATUS} and process_times <= {self.TASK_MAX_PROCESS_TIMES} LIMIT {self.spider_coroutines}; """ tasks = await self.mysql_client.async_select(select_sql) if tasks: return [ { "trace_id": i[0], "content_id": i[1], "flow_pool_level": i[2], "gh_id": i[3], "process_times": i[4] } for i in tasks ] else: return [] async def get_video_list(self, content_id): """ 判断该文章是否存在历史匹配视频 :param content_id :return: """ sql = f""" SELECT id FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' and download_status = 2; """ res_tuple = await self.mysql_client.async_select(sql) if len(res_tuple) >= 3: return True else: return False async def update_content_status(self, new_content_status, trace_id, ori_content_status): """ :param new_content_status: :param trace_id: :param ori_content_status: :return: """ update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s WHERE trace_id = %s and content_status = %s; """ row_counts = await self.mysql_client.async_insert( sql=update_sql, params=( new_content_status, int(time.time()), trace_id, ori_content_status ) ) return row_counts async def roll_back_content_status_when_fails(self, process_times, trace_id): """ 处理失败,回滚至初始状态,处理次数加 1 :param process_times: :param trace_id: :return: """ update_article_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s, process_times = %s WHERE trace_id = %s and content_status = %s; """ await self.mysql_client.async_insert( sql=update_article_sql, params=( self.TASK_INIT_STATUS, int(time.time()), process_times + 1, trace_id, self.TASK_PROCESSING_STATUS ) ) async def judge_whether_same_content_id_is_processing(self, content_id): """ 同一个 content_id 只需要处理一次 :param content_id: :return: success: 4 init: 0 fail: 99 """ select_sql = f""" SELECT distinct content_status FROM {self.article_match_video_table} WHERE content_id = '{content_id}'; """ result = await self.mysql_client.async_select(select_sql) if result: for item in result: content_status = item[0] # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} : if content_status in { self.TASK_KIMI_FINISHED_STATUS, self.TASK_SPIDER_FINISHED_STATUS, self.TASK_ETL_FINISHED_STATUS, self.TASK_PROCESSING_STATUS, self.TASK_PUBLISHED_STATUS }: return True return False else: return False async def get_downloaded_videos(self, content_id): """ 获取已下载的视频 :return: """ sql = f""" SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' and download_status = 2 ORDER BY score DESC; """ res_tuple = await self.mysql_client.async_select(sql) return [ { "platform": i[0], "play_count": i[1], "like_count": i[2], "video_oss_path": i[3], "cover_oss_path": i[4], "uid": i[5] } for i in res_tuple ] async def get_kimi_status(self, content_id): """ 通过 content_id 获取kimi info :return: """ select_sql = f""" select kimi_status from {self.article_text_table} where content_id = '{content_id}'; """ response = await self.mysql_client.async_select(select_sql) if response: kimi_status = response[0][0] return kimi_status else: return self.ARTICLE_TEXT_TABLE_ERROR async def kimi_task(self, params): """ 执行 kimi 任务 :return: """ KIMI_SUCCESS_STATUS = 1 KIMI_FAIL_STATUS = 2 content_id = params['content_id'] trace_id = params['trace_id'] process_times = params['process_times'] kimi_status_code = await self.get_kimi_status(content_id=content_id) if kimi_status_code == KIMI_SUCCESS_STATUS: affected_rows = await self.update_content_status( new_content_status=self.TASK_KIMI_FINISHED_STATUS, trace_id=trace_id, ori_content_status=self.TASK_INIT_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return get_kimi_sql = f""" SELECT article_title, kimi_title, kimi_summary, kimi_keys FROM {self.article_text_table} WHERE content_id = '{content_id}'; """ kimi_info = await self.mysql_client.async_select(get_kimi_sql) return { "kimi_title": kimi_info[0][1], "ori_title": kimi_info[0][0], "kimi_summary": kimi_info[0][2], "kimi_keys": json.loads(kimi_info[0][3]) } elif kimi_status_code == self.ARTICLE_TEXT_TABLE_ERROR: logging( code="4000", info="long_articles_text表中未找到 content_id" ) else: # 开始处理,讲 content_status 从 0 改为 101 affected_rows = await self.update_content_status( new_content_status=self.TASK_PROCESSING_STATUS, trace_id=trace_id, ori_content_status=self.TASK_INIT_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return K = KimiServer() try: select_sql = f""" select article_title, article_text from {self.article_text_table} where content_id = '{content_id}' """ res = await self.mysql_client.async_select(select_sql) article_obj = { "article_title": res[0][0], "article_text": res[0][1], "content_id": content_id } kimi_info = await K.search_kimi_schedule(params=article_obj) kimi_title = kimi_info['k_title'] content_title = kimi_info['content_title'].replace("'", "").replace('"', "") content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False) update_kimi_sql = f""" UPDATE {self.article_text_table} SET kimi_title = %s, kimi_summary = %s, kimi_keys = %s, kimi_status = %s WHERE content_id = %s;""" await self.mysql_client.async_insert( sql=update_kimi_sql, params=(kimi_title, content_title, content_keys, KIMI_SUCCESS_STATUS, params['content_id']) ) await self.update_content_status( new_content_status=self.TASK_KIMI_FINISHED_STATUS, trace_id=trace_id, ori_content_status=self.TASK_PROCESSING_STATUS ) return { "kimi_title": kimi_title, "ori_title": article_obj['article_title'], "kimi_summary": content_title, "kimi_keys": kimi_info['content_keys'] } except Exception as e: # kimi 任务处理失败 update_kimi_sql = f""" UPDATE {self.article_text_table} SET kimi_status = %s WHERE content_id = %s """ await self.mysql_client.async_insert( sql=update_kimi_sql, params=( KIMI_FAIL_STATUS, content_id ) ) # 将状态由 101 回退为 0 await self.roll_back_content_status_when_fails( process_times=process_times, trace_id=trace_id ) return {} async def spider_task(self, params, kimi_result): """ 爬虫任务 :return: """ SPIDER_INIT_STATUS = 1 trace_id = params['trace_id'] content_id = params['content_id'] process_times = params['process_times'] gh_id = params['gh_id'] select_sql = f""" select count(id) from {self.article_crawler_video_table} where content_id = '{content_id}'; """ count_tuple = await self.mysql_client.async_select(select_sql) counts = count_tuple[0][0] if counts >= 3: await self.update_content_status( new_content_status=self.TASK_SPIDER_FINISHED_STATUS, trace_id=trace_id, ori_content_status=SPIDER_INIT_STATUS ) return True # 开始处理,将状态由 1 改成 101 affected_rows = await self.update_content_status( new_content_status=self.TASK_PROCESSING_STATUS, ori_content_status=SPIDER_INIT_STATUS, trace_id=trace_id ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False try: search_videos_count = await search_videos_from_web( info={ "ori_title": kimi_result['ori_title'], "kimi_summary": kimi_result['kimi_summary'], "kimi_keys": kimi_result['kimi_keys'], "trace_id": trace_id, "gh_id": gh_id, "content_id": content_id, "crawler_video_table": self.article_crawler_video_table }, gh_id_map=self.account_map, db_client=self.mysql_client ) if search_videos_count >= 3: # 表示爬虫任务执行成功, 将状态从 101 改为 2 await self.update_content_status( new_content_status=self.TASK_SPIDER_FINISHED_STATUS, trace_id=trace_id, ori_content_status=self.TASK_PROCESSING_STATUS ) return True else: await self.roll_back_content_status_when_fails( process_times=process_times + 1, trace_id=trace_id ) return False except Exception as e: await self.roll_back_content_status_when_fails( process_times=process_times + 1, trace_id=trace_id ) print("爬虫处理失败: {}".format(e)) return False async def etl_task(self, params): """ download && upload videos :param params: :return: """ VIDEO_DOWNLOAD_SUCCESS_STATUS = 2 VIDEO_DOWNLOAD_FAIL_STATUS = 3 ETL_TASK_INIT_STATUS = 2 trace_id = params['trace_id'] content_id = params['content_id'] # 判断是否有三条已经下载完成的视频 select_sql = f""" select count(id) from {self.article_crawler_video_table} where content_id = '{content_id}' and download_status = {VIDEO_DOWNLOAD_SUCCESS_STATUS}; """ video_count_tuple = await self.mysql_client.async_select(select_sql) video_count = video_count_tuple[0][0] if video_count >= 3: affect_rows = await self.update_content_status( ori_content_status=ETL_TASK_INIT_STATUS, trace_id=trace_id, new_content_status=self.TASK_ETL_FINISHED_STATUS ) if affect_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False return True else: # 开始处理, 将文章状态修改为处理状态 affected_rows = await self.update_content_status( ori_content_status=ETL_TASK_INIT_STATUS, trace_id=trace_id, new_content_status=self.TASK_PROCESSING_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False select_sql = f""" SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' and download_status != {VIDEO_DOWNLOAD_SUCCESS_STATUS} ORDER BY score DESC; """ videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql) downloaded_count = 0 for line in videos_need_to_download_tuple: params = { "id": line[0], "video_id": line[1], "platform": line[2], "video_title": line[3], "video_url": line[4], "cover_url": line[5], "user_id": line[6], "trace_id": line[7] } try: local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id']) # download videos file_path = await download_video( file_path=local_video_path, platform=params['platform'], video_url=params['video_url'] ) # download cover cover_path = await download_cover( file_path=local_cover_path, platform=params['platform'], cover_url=params['cover_url'] ) oss_video = await upload_to_oss( local_video_path=file_path, download_type="video" ) if cover_path: oss_cover = await upload_to_oss( local_video_path=cover_path, download_type="image" ) else: oss_cover = None update_sql = f""" UPDATE {self.article_crawler_video_table} SET video_oss_path = %s, cover_oss_path = %s, download_status = %s WHERE id = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=( oss_video, oss_cover, VIDEO_DOWNLOAD_SUCCESS_STATUS, params['id'] ) ) downloaded_count += 1 if downloaded_count > 3: await self.update_content_status( ori_content_status=self.TASK_PROCESSING_STATUS, trace_id=trace_id, new_content_status=self.TASK_ETL_FINISHED_STATUS ) return True except Exception as e: update_sql = f""" UPDATE {self.article_crawler_video_table} SET download_status = %s WHERE id = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id']) ) if downloaded_count >= 3: await self.update_content_status( ori_content_status=self.TASK_PROCESSING_STATUS, trace_id=trace_id, new_content_status=self.TASK_ETL_FINISHED_STATUS ) return True else: await self.roll_back_content_status_when_fails( process_times=params['process_times'] + 1, trace_id=params['trace_id'] ) return False async def publish_task(self, params, kimi_title): """ 发布任务 :param kimi_title: :param params: :return: """ PUBLISH_DEFAULT_STATUS = 3 gh_id = params['gh_id'] flow_pool_level = params['flow_pool_level'] content_id = params['content_id'] trace_id = params['trace_id'] process_times = params['process_times'] # 开始处理,将状态修改为操作状态 affected_rows = await self.update_content_status( ori_content_status=PUBLISH_DEFAULT_STATUS, trace_id=trace_id, new_content_status=self.TASK_PROCESSING_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False try: download_videos = await self.get_downloaded_videos(content_id) match flow_pool_level: case "autoArticlePoolLevel4": # 冷启层, 全量做 video_list = shuffle_list(download_videos)[:3] case "autoArticlePoolLevel3": if self.gh_id_dict.get(gh_id): video_list = shuffle_list(download_videos)[:3] else: video_list = download_videos[:3] case "autoArticlePoolLevel2": # 次条,只针对具体账号做 video_list = [] case "autoArticlePoolLevel1": # 头条,先不做 video_list = download_videos[:3] case _: video_list = download_videos[:3] L = [] for video_obj in video_list: params = { "videoPath": video_obj['video_oss_path'], "uid": video_obj['uid'], "title": kimi_title } publish_response = await publish_to_pq(params) video_id = publish_response['data']['id'] response = await get_pq_video_detail(video_id) obj = { "uid": video_obj['uid'], "source": video_obj['platform'], "kimiTitle": kimi_title, "videoId": response['data'][0]['id'], "videoCover": response['data'][0]['shareImgPath'], "videoPath": response['data'][0]['videoPath'], "videoOss": video_obj['video_oss_path'] } L.append(obj) update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, response = %s, process_times = %s WHERE trace_id = %s and content_status = %s; """ # 从操作中状态修改为已发布状态 await self.mysql_client.async_insert( sql=update_sql, params=( self.TASK_PUBLISHED_STATUS, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id, self.TASK_PROCESSING_STATUS ) ) except Exception as e: await self.roll_back_content_status_when_fails( process_times=params['process_times'] + 1, trace_id=params['trace_id'] ) print(e) async def start_process(self, params): """ 处理单篇文章 :param params: :return: """ # step1: 执行 kimi 操作 # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间 kimi_result = await self.kimi_task(params) trace_id = params['trace_id'] if kimi_result: # 等待 kimi 操作执行完成之后,开始执行 spider_task print("kimi success") logging( code=3001, info="kimi success", trace_id=trace_id ) spider_flag = await self.spider_task(params=params, kimi_result=kimi_result) if spider_flag: # 等待爬虫执行完成后,开始执行 etl_task print("spider success") logging( code=3002, info="spider_success", trace_id=trace_id ) etl_flag = await self.etl_task(params) if etl_flag: # 等待下载上传完成,执行发布任务 print("etl success") logging( code="3003", info="etl_success", trace_id=trace_id ) try: await self.publish_task(params, kimi_result['kimi_title']) logging( code="3004", info="publish_success", trace_id=trace_id ) except Exception as e: logging( code="6004", info="publish 失败--{}".format(e), trace_id=params['trace_id'] ) else: logging( code="6003", info="ETL 处理失败", trace_id=params['trace_id'] ) else: logging( code="6002", info="爬虫处理失败", trace_id=params['trace_id'] ) else: logging( code="6001", info="kimi 处理失败", trace_id=params['trace_id'] ) async def process_task(self, params): """ 处理任务 :return: """ content_id = params['content_id'] download_videos = await self.get_video_list(content_id) if not download_videos: # 开始处理, 判断是否有相同的文章 id 正在处理 processing_flag = await self.judge_whether_same_content_id_is_processing(content_id) if processing_flag: logging( code="9001", info="该 content id 正在处理中, 跳过此任务--{}".format(content_id) ) else: await self.start_process(params=params) else: print("存在已下载视频") async def deal(self): """ function :return: """ task_list = await self.get_tasks() print(task_list) task_dict = {} # 对 content_id去重 for task in task_list: key = task['content_id'] task_dict[key] = task process_list = [] for item in task_dict: process_list.append(task_dict[item]) logging( code="5001", info="Match Task Got {} this time".format(len(process_list)), function="Publish Task" ) if task_list: total_task = len(process_list) a = time.time() print("开始处理,一共{}个任务".format(total_task)) tasks = [self.process_task(params) for params in process_list] await asyncio.gather(*tasks) b = time.time() print("处理时间: {} s".format(b - a)) else: logging( code="9008", info="没有要处理的请求" )