""" @author: luojunhui """ import json import time import asyncio import traceback from applications.feishu import bot from applications.config import Config from applications.const import HistoryContentIdTaskConst from applications.log import logging from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail from applications.functions.common import shuffle_list from applications.functions.aigc import record_trace_id class historyContentIdTask(object): """ 处理已经匹配过小程序的文章 """ def __init__(self, mysql_client, publish_flag): """ :param mysql_client: """ self.mysql_client = mysql_client self.config = Config() self.const = HistoryContentIdTaskConst() self.article_match_video_table = self.config.article_match_video_table self.article_text_table = self.config.article_text_table self.article_crawler_video_table = self.config.article_crawler_video_table self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2")) self.history_coroutines = self.config.get_config_value("historyArticleCoroutines") self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category")) self.publish_flag = publish_flag async def get_tasks(self): """ 获取任务 :return: """ select_sql1 = f""" SELECT ART.trace_id, ART.content_id, ART.flow_pool_level, ART.gh_id, ART.process_times, ART.publish_flag FROM {self.article_match_video_table} ART JOIN ( select content_id, count(1) as cnt from {self.article_crawler_video_table} where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS} group by content_id ) VID on ART.content_id = VID.content_id and VID.cnt >= {self.const.MIN_VIDEO_NUM} WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES} AND ART.publish_flag = {self.publish_flag} -- ORDER BY ART.flow_pool_level, ART.request_timestamp LIMIT {self.history_coroutines}; """ tasks = await self.mysql_client.async_select(sql=select_sql1) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "flow_pool_level": item[2], "gh_id": item[3], "process_times": item[4], "publish_flag": item[5] } for item in tasks ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list async def get_video_list(self, content_id): """ content_id :return: """ sql = f""" SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS} ORDER BY score DESC; """ res_tuple = await self.mysql_client.async_select(sql) if len(res_tuple) >= 3: return [ { "platform": i[0], "play_count": i[1], "like_count": i[2], "video_oss_path": i[3], "cover_oss_path": i[4], "uid": i[5] } for i in res_tuple ] else: return [] async def get_kimi_title(self, content_id): """ 获取 kimiTitle :param content_id: :return: """ select_sql = f""" select kimi_title from {self.article_text_table} where content_id = '{content_id}'; """ res_tuple = await self.mysql_client.async_select(select_sql) if res_tuple: return res_tuple[0][0] else: return False async def update_content_status(self, new_content_status, trace_id, ori_content_status): """ :param new_content_status: :param trace_id: :param ori_content_status: :return: """ update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s WHERE trace_id = %s and content_status = %s; """ row_counts = await self.mysql_client.async_insert( sql=update_sql, params=( new_content_status, int(time.time()), trace_id, ori_content_status ) ) return row_counts async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times): """ 发布至 pq :param process_times: :param trace_id: :param download_videos: 已下载的视频---> list [{}, {}, {}.... ] :param gh_id: 公众号 id ---> str :param kimi_title: kimi 标题 ---> str :param flow_pool_level: 流量池层级 ---> str :return: """ match flow_pool_level: case "autoArticlePoolLevel4": # 冷启层, 全量做 video_list = shuffle_list(download_videos)[:3] case "autoArticlePoolLevel3": # 次条,只针对具体账号做 if self.gh_id_dict.get(gh_id): video_list = shuffle_list(download_videos)[:3] else: video_list = download_videos[:3] case "autoArticlePoolLevel2": video_list = [] case "autoArticlePoolLevel1": # 头条,先不做 video_list = download_videos[:3] case _: print("未传流量池信息") video_list = download_videos[:3] L = [] for video_obj in video_list: params = { "videoPath": video_obj['video_oss_path'], "uid": video_obj['uid'], "title": kimi_title } publish_response = await publish_to_pq(params) video_id = publish_response['data']['id'] response = await get_pq_video_detail(video_id) # time.sleep(2) obj = { "uid": video_obj['uid'], "source": video_obj['platform'], "kimiTitle": kimi_title, "videoId": response['data'][0]['id'], "videoCover": response['data'][0]['shareImgPath'], "videoPath": response['data'][0]['videoPath'], "videoOss": video_obj['video_oss_path'] } logging( code="history1006", info="视频已经发布到 pq", trace_id=trace_id, data=obj ) L.append(obj) update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, response = %s, process_times = %s WHERE trace_id = %s and content_status = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=( self.const.TASK_PUBLISHED_STATUS, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id, self.const.TASK_PROCESSING_STATUS ) ) logging( code="history1007", info="已经更文章状态为已发布", trace_id=trace_id, data=L ) await record_trace_id(trace_id=trace_id, status=self.const.RECORD_SUCCESS_TRACE_ID_CODE) async def roll_back_content_status_when_fails(self, process_times, trace_id): """ 处理失败,回滚至初始状态,处理次数加 1 :param process_times: :param trace_id: :return: """ update_article_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s, process_times = %s WHERE trace_id = %s and content_status = %s; """ await self.mysql_client.async_insert( sql=update_article_sql, params=( self.const.TASK_INIT_STATUS, int(time.time()), process_times + 1, trace_id, self.const.TASK_PROCESSING_STATUS ) ) async def check_title_whether_exit(self, content_id): """ 校验文章是标题是否晋升 or 退场 :return: """ sql = f""" SELECT lat.article_title, cstp.status FROM long_articles_text lat JOIN cold_start_title_pool cstp ON lat.article_title = cstp.title WHERE lat.content_id = '{content_id}'; """ result = await self.mysql_client.async_select(sql) if result: status = result[0][1] if status in {self.const.UP_LEVEL_STATUS, self.const.TITLE_EXIT_STATUS}: return True else: return False else: return False async def check_title_category(self, content_id, gh_id, trace_id) -> bool: """ 判断该文章的品类是否属于该账号的品类 :param trace_id: :param content_id: :param gh_id: :return: """ bad_category_list = self.account_negative_category.get(gh_id, []) logging( code="history1101", info="该账号的 negative 类型列表", trace_id=trace_id, data=bad_category_list ) if bad_category_list: sql = f""" SELECT category FROM article_category WHERE produce_content_id = '{content_id}'; """ result = await self.mysql_client.async_select(sql) if result: category = result[0][0] logging( code="history1102", info="文章的品类-{}".format(category), trace_id=trace_id ) if category in bad_category_list: return True return False async def process_task(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] flow_pool_level = params['flow_pool_level'] gh_id = params['gh_id'] process_times = params['process_times'] publish_flag = params['publish_flag'] if flow_pool_level == "autoArticlePoolLevel4": # 校验文章是否属于该账号的negative 类型 negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id, trace_id=trace_id) if negative_category_status: # 修改状态为品类不匹配状态 logging( code="history1002", info="文章属于该账号的negative 类型", trace_id=trace_id ) affected_rows = await self.update_content_status( trace_id=trace_id, new_content_status=self.const.MISMATCH_STATUS, ori_content_status=self.const.TASK_INIT_STATUS ) logging( code="history1003", info="已经修改该文章状态为 品类不匹配状态", trace_id=trace_id ) if affected_rows == 0: print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出") return await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE) return # 校验文章是否晋升 or 退场 exit_status = await self.check_title_whether_exit(content_id) if exit_status: # 修改状态为退出状态 logging( code="history1004", info="文章已经晋升 or 退场", trace_id=trace_id ) affected_rows = await self.update_content_status( trace_id=trace_id, new_content_status=self.const.EXIT_STATUS, ori_content_status=self.const.TASK_INIT_STATUS ) logging( code="history1005", info="已经修改该文章状态为 退出状态", trace_id=trace_id ) if affected_rows == 0: print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出") return await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE) return download_videos = await self.get_video_list(content_id=content_id) # time.sleep(3) if download_videos: # 修改状态为执行状态,获取该任务的锁 affected_rows = await self.update_content_status( trace_id=trace_id, new_content_status=self.const.TASK_PROCESSING_STATUS, ori_content_status=self.const.TASK_INIT_STATUS ) if affected_rows == 0: print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出") return if publish_flag == self.const.DO_NOT_NEED_PUBLISH: logging( code="3013", info="长文系统托管, 不进行发布", trace_id=trace_id ) # 把状态改为3 await self.update_content_status( trace_id=trace_id, new_content_status=self.const.TASK_ETL_COMPLETE_STATUS, ori_content_status=self.const.TASK_PROCESSING_STATUS ) return try: kimi_title = await self.get_kimi_title(content_id) await self.publish_videos_to_pq( flow_pool_level=flow_pool_level, kimi_title=kimi_title, gh_id=gh_id, trace_id=trace_id, download_videos=download_videos, process_times=process_times ) except Exception as e: logging( code="history1008", info="history task 在发布的时候出现异常, error = {}".format(e), trace_id=trace_id, data={ "error": str(e), "traceback": traceback.format_exc() } ) bot( title="history task failed", detail={ "trace_id": trace_id, "error": str(e) }, mention=False ) await self.roll_back_content_status_when_fails( trace_id=trace_id, process_times=process_times ) else: return async def deal(self): """ 处理 :return: """ task_list = await self.get_tasks() logging( code="history1001", info="History content_task Task Got {} this time".format(len(task_list)), function="History Contents Task {}".format(self.publish_flag) ) if task_list: a = time.time() tasks = [self.process_task(params) for params in task_list] await asyncio.gather(*tasks) b = time.time() print("{} s 内处理了{}个任务".format(b - a, len(task_list))) else: print("暂时未获得历史已存在文章")