""" @author: luojunhui """ import json import time import asyncio from applications.config import Config from applications.log import logging from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail from applications.functions.common import shuffle_list from applications.match_algorithm.rank import get_content_oss_fission_dict class historyContentIdTask(object): """ 处理已经匹配过小程序的文章 """ TASK_PROCESSING_STATUS = 101 TASK_INIT_STATUS = 0 TASK_PUBLISHED_STATUS = 4 def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client self.config = Config() self.article_match_video_table = self.config.article_match_video_table self.article_text_table = self.config.article_text_table self.article_crawler_video_table = self.config.article_crawler_video_table self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2")) self.history_coroutines = self.config.get_config_value("historyArticleCoroutines") async def get_tasks(self): """ 获取任务 :return: """ select_sql1 = f""" SELECT ART.trace_id, ART.content_id, ART.flow_pool_level, ART.gh_id, ART.process_times FROM {self.article_match_video_table} ART JOIN ( select content_id, count(1) as cnt from {self.article_crawler_video_table} where download_status = 2 group by content_id ) VID on ART.content_id = VID.content_id and VID.cnt >= 3 WHERE ART.content_status = 0 and ART.process_times <= 3 ORDER BY request_timestamp LIMIT {self.history_coroutines}; """ tasks = await self.mysql_client.async_select(sql=select_sql1) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "flow_pool_level": item[2], "gh_id": item[3], "process_times": item[4] } for item in tasks ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list async def get_video_list(self, content_id) -> list[dict]: """ content_id :return: """ sql = f""" SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' and download_status = 2 ORDER BY score DESC; """ res_tuple = await self.mysql_client.async_select(sql) fission_dict = await get_content_oss_fission_dict( db_client=self.mysql_client, config=self.config, content_id=content_id ) if len(res_tuple) >= 3: return [ { "platform": i[0], "play_count": i[1], "like_count": i[2], "video_oss_path": i[3], "cover_oss_path": i[4], "uid": i[5], "fission_0_rate": fission_dict.get(i[3], {}).get("fission_0_rate", 0), "fission_0_on_read": fission_dict.get(i[3], {}).get("fission_0_on_read", 0) } for i in res_tuple ] else: return [] async def get_kimi_title(self, content_id): """ 获取 kimiTitle :param content_id: :return: """ select_sql = f""" select kimi_title from {self.article_text_table} where content_id = '{content_id}'; """ res_tuple = await self.mysql_client.async_select(select_sql) if res_tuple: return res_tuple[0][0] else: return False async def update_content_status(self, new_content_status, trace_id, ori_content_status): """ :param new_content_status: :param trace_id: :param ori_content_status: :return: """ update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s WHERE trace_id = %s and content_status = %s; """ row_counts = await self.mysql_client.async_insert( sql=update_sql, params=( new_content_status, int(time.time()), trace_id, ori_content_status ) ) return row_counts async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times): """ 发布至 pq :param process_times: :param trace_id: :param download_videos: 已下载的视频---> list [{}, {}, {}.... ] :param gh_id: 公众号 id ---> str :param kimi_title: kimi 标题 ---> str :param flow_pool_level: 流量池层级 ---> str :return: """ match flow_pool_level: case "autoArticlePoolLevel4": # 冷启层, 全量做 video_list = shuffle_list(download_videos)[:3] case "autoArticlePoolLevel3": # 次条,只针对具体账号做 if self.gh_id_dict.get(gh_id): video_list = shuffle_list(download_videos)[:3] else: video_list = download_videos[:3] case "autoArticlePoolLevel2": video_list = [] case "autoArticlePoolLevel1": # 头条内容,使用重排后结果 sorted_videos = sorted(download_videos, key=lambda x: x['fission_0_rate'], reverse=True) video_list = sorted_videos[:3] case _: print("未传流量池信息") video_list = download_videos[:3] L = [] for video_obj in video_list: params = { "videoPath": video_obj['video_oss_path'], "uid": video_obj['uid'], "title": kimi_title } publish_response = await publish_to_pq(params) video_id = publish_response['data']['id'] response = await get_pq_video_detail(video_id) # time.sleep(2) obj = { "uid": video_obj['uid'], "source": video_obj['platform'], "kimiTitle": kimi_title, "videoId": response['data'][0]['id'], "videoCover": response['data'][0]['shareImgPath'], "videoPath": response['data'][0]['videoPath'], "videoOss": video_obj['video_oss_path'] } L.append(obj) update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, response = %s, process_times = %s WHERE trace_id = %s and content_status = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=( self.TASK_PUBLISHED_STATUS, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id, self.TASK_PROCESSING_STATUS ) ) logging( code="9002", info="已经从历史文章更新", trace_id=trace_id ) async def roll_back_content_status_when_fails(self, process_times, trace_id): """ 处理失败,回滚至初始状态,处理次数加 1 :param process_times: :param trace_id: :return: """ update_article_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s, process_times = %s WHERE trace_id = %s and content_status = %s; """ await self.mysql_client.async_insert( sql=update_article_sql, params=( self.TASK_INIT_STATUS, int(time.time()), process_times + 1, trace_id, self.TASK_PROCESSING_STATUS ) ) async def process_task(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] flow_pool_level = params['flow_pool_level'] gh_id = params['gh_id'] process_times = params['process_times'] download_videos = await self.get_video_list(content_id=content_id) # time.sleep(3) if download_videos: # 修改状态为执行状态,获取该任务的锁 affected_rows = await self.update_content_status( trace_id=trace_id, new_content_status=self.TASK_PROCESSING_STATUS, ori_content_status=self.TASK_INIT_STATUS ) if affected_rows == 0: print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出") return try: kimi_title = await self.get_kimi_title(content_id) await self.publish_videos_to_pq( flow_pool_level=flow_pool_level, kimi_title=kimi_title, gh_id=gh_id, trace_id=trace_id, download_videos=download_videos, process_times=process_times ) except Exception as e: logging( code="5003", info="history task 在发布的时候出现异常, error = {}".format(e), trace_id=trace_id ) await self.roll_back_content_status_when_fails( trace_id=trace_id, process_times=process_times ) else: return async def deal(self): """ 处理 :return: """ task_list = await self.get_tasks() logging( code="5002", info="History content_task Task Got {} this time".format(len(task_list)), function="History Contents Task" ) if task_list: a = time.time() tasks = [self.process_task(params) for params in task_list] await asyncio.gather(*tasks) b = time.time() print("{} s 内处理了{}个任务".format(b - a, len(task_list))) else: print("暂时未获得历史已存在文章")