""" @author: luojunhui """ import json import time import asyncio import random from typing import List, Dict from applications.config import Config from applications.const import new_content_id_task_const as NewContentIdTaskConst from applications.log import logging from applications.functions.common import shuffle_list from applications.spider import search_videos_from_web from applications.feishu import bot from applications.functions.aigc import record_trace_id from .utils import * class NewContentIdTask(object): """ 不存在历史已经发布的文章的匹配流程 """ def __init__(self, long_articles_client, aigc_client): self.long_articles_client = long_articles_client self.aigc_client = aigc_client self.config = Config() self.article_match_video_table = self.config.article_match_video_table self.article_text_table = self.config.article_text_table self.article_crawler_video_table = self.config.article_crawler_video_table self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2")) self.account_map = json.loads(self.config.get_config_value("accountMap")) self.spider_coroutines = self.config.get_config_value("spiderCoroutines") async def get_tasks(self) -> List[Dict]: """ 获取 task :return: """ # 获取 process_times <= 3 且 content_status = 0 的任务 select_sql = f""" SELECT t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag FROM {self.article_match_video_table} t1 LEFT JOIN ( SELECT content_id, count(1) as cnt FROM {self.article_crawler_video_table} WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS} GROUP BY content_id ) t2 ON t1.content_id = t2.content_id WHERE t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS} AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES} AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM} ORDER BY flow_pool_level, request_timestamp LIMIT {self.spider_coroutines}; """ tasks = await self.long_articles_client.async_select(select_sql) if tasks: return [ { "trace_id": i[0], "content_id": i[1], "flow_pool_level": i[2], "gh_id": i[3], "process_times": i[4], "publish_flag": i[5] } for i in tasks ] else: return [] async def set_tasks_status_fail(self) -> None: """ 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败 """ update_status_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s WHERE process_times > %s and content_status not in (%s, %s); """ await self.long_articles_client.async_insert( update_status_sql, params=( NewContentIdTaskConst.TASK_FAIL_STATUS, NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES, NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS, NewContentIdTaskConst.TASK_PUBLISHED_STATUS ) ) async def roll_back_unfinished_tasks(self, publish_flag: int) -> None: """ 将长时间处于中间状态的任务回滚 """ # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1 if publish_flag == NewContentIdTaskConst.NEED_PUBLISH: processing_status_tuple = ( NewContentIdTaskConst.TASK_PROCESSING_STATUS, NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS, NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS, NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS ) elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH: processing_status_tuple = ( NewContentIdTaskConst.TASK_PROCESSING_STATUS, NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS, NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS ) else: return select_processing_sql = f""" SELECT trace_id, content_status_update_time, process_times, content_status FROM {self.article_match_video_table} WHERE content_status in {processing_status_tuple} and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES} and publish_flag = {publish_flag}; """ processing_articles = await self.long_articles_client.async_select(select_processing_sql) if processing_articles: processing_list = [ { "trace_id": item[0], "content_status_update_time": item[1], "process_times": item[2], "content_status": item[3] } for item in processing_articles ] for obj in processing_list: if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT: # 认为该任务失败 await self.roll_back_content_status_when_fails( process_times=obj['process_times'] + 1, trace_id=obj['trace_id'], ori_content_status=obj['content_status'] ) async def update_content_status(self, new_content_status, trace_id, ori_content_status): """ :param new_content_status: :param trace_id: :param ori_content_status: :return: """ update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s WHERE trace_id = %s and content_status = %s; """ row_counts = await self.long_articles_client.async_insert( sql=update_sql, params=( new_content_status, int(time.time()), trace_id, ori_content_status ) ) return row_counts async def roll_back_content_status_when_fails(self, process_times, trace_id, ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS): """ 处理失败,回滚至初始状态,处理次数加 1 :param process_times: :param trace_id: :param ori_content_status: :return: """ update_article_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s, content_status_update_time = %s, process_times = %s WHERE trace_id = %s and content_status = %s; """ await self.long_articles_client.async_insert( sql=update_article_sql, params=( NewContentIdTaskConst.TASK_INIT_STATUS, int(time.time()), process_times + 1, trace_id, ori_content_status ) ) async def judge_whether_same_content_id_is_processing(self, content_id): """ 同一个 content_id 只需要处理一次 :param content_id: :return: success: 4 init: 0 fail: 99 """ select_sql = f""" SELECT distinct content_status FROM {self.article_match_video_table} WHERE content_id = '{content_id}'; """ result = await self.long_articles_client.async_select(select_sql) if result: for item in result: content_status = item[0] # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} : if content_status in { NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS, NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS, # NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS, NewContentIdTaskConst.TASK_PROCESSING_STATUS, # NewContentIdTaskConst.TASK_PUBLISHED_STATUS }: return True return False else: return False async def get_source_content_id(self, new_content_id): """ 通过content_id 查找源content_id,并且更新其内容 """ select_channel_id_sql = f""" SELECT channel_content_id FROM produce_plan_exe_record WHERE plan_exe_id = '{new_content_id}'; """ channel_content_id = await self.aigc_client.async_select(select_channel_id_sql) if channel_content_id: select_source_content_id_sql = f""" SELECT root_produce_content_id FROM article_pool_promotion_source WHERE channel_content_id = '{channel_content_id[0][0]}'; """ source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql) if source_content_id: return source_content_id[0][0] else: return else: return async def get_illegal_out_ids(self, content_id: str) -> List[str]: """ 获取违规的外站视频id """ select_sql = f""" SELECT platform, out_video_id FROM {self.article_crawler_video_table} WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE}; """ response = await self.long_articles_client.async_select(select_sql) if response: result = ["{}_{}".format(line[0], line[1]) for line in response] return result else: return [] async def kimi_task(self, params): """ 执行 kimi 任务 :return: """ trace_id = params['trace_id'] # 处理content_id content_id = params['content_id'] process_times = params['process_times'] kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client) if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS: affected_rows = await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS, trace_id=trace_id, ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client) return kimi_result elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR: logging( code="4000", info="long_articles_text表中未找到 content_id" ) else: # 校验是否存在root_content_id if params.get("root_content_id"): kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client) if kimi_result: affected_rows = await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS, trace_id=trace_id, ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return # 将root_content_id的kimi结果更新到content_id content_id = params['content_id'] update_count = await update_kimi_status( kimi_info=kimi_result, content_id=content_id, db_client=self.long_articles_client, article_text_table=self.article_text_table, success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS, init_status=NewContentIdTaskConst.KIMI_INIT_STATUS ) if update_count == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return logging( code="8023", function="kimi_task", trace_id=trace_id, info="从root_content_id获取结果", data=params ) return kimi_result else: params.pop('root_content_id', None) return await self.kimi_task(params) # 开始处理,讲 content_status 从 0 改为 101 affected_rows = await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS, trace_id=trace_id, ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return try: kimi_result = await generate_kimi_result( content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client, safe_score=NewContentIdTaskConst.KIMI_SAFE_SCORE ) await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS, trace_id=trace_id, ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS ) return kimi_result except Exception as e: # kimi 任务处理失败 update_kimi_sql = f""" UPDATE {self.article_text_table} SET kimi_status = %s WHERE content_id = %s """ await self.long_articles_client.async_insert( sql=update_kimi_sql, params=( NewContentIdTaskConst.KIMI_FAIL_STATUS, content_id ) ) # 将状态由 101 回退为 0 await self.roll_back_content_status_when_fails( process_times=process_times, trace_id=trace_id ) return {} async def spider_task(self, params, kimi_result): """ 爬虫任务 :return: """ trace_id = params['trace_id'] content_id = params['content_id'] process_times = params['process_times'] gh_id = params['gh_id'] download_video_exist_flag = await whether_downloaded_videos_exists( content_id=content_id, article_crawler_video_table=self.article_crawler_video_table, db_client=self.long_articles_client ) if download_video_exist_flag: await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS, trace_id=trace_id, ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS ) return True # 判断是否存在root_content_id if params.get("root_content_id"): # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2 update_rows = await update_crawler_table_with_exist_content_id( content_id=content_id, trace_id=trace_id, article_crawler_video_table=self.article_crawler_video_table, db_client=self.long_articles_client, root_content_id=params['root_content_id'] ) if update_rows: affected_rows = await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS, trace_id=trace_id, ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS ) if affected_rows == 0: return logging( code="8024", function="spider_task", trace_id=trace_id, info="从root_content_id获取结果", data=params ) return True else: params.pop("root_content_id", None) return await self.spider_task(params, kimi_result) # 开始处理,将状态由 1 改成 101 affected_rows = await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS, ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS, trace_id=trace_id ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False try: logging( code="spider_1001", info="开始执行搜索任务", trace_id=trace_id, data=kimi_result ) search_videos_count = await search_videos_from_web( info={ "ori_title": kimi_result['ori_title'], "kimi_summary": kimi_result['kimi_summary'], "kimi_keys": kimi_result['kimi_keys'], "trace_id": trace_id, "gh_id": gh_id, "content_id": content_id, "crawler_video_table": self.article_crawler_video_table }, gh_id_map=self.account_map, db_client=self.long_articles_client ) if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM: # 表示爬虫任务执行成功, 将状态从 101 改为 2 logging( code="spider_1002", info="搜索成功", trace_id=trace_id, data=kimi_result ) await self.update_content_status( new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS, trace_id=trace_id, ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS ) return True else: logging( code="spider_1003", info="搜索失败", trace_id=trace_id, data=kimi_result ) await self.roll_back_content_status_when_fails( process_times=process_times + 1, trace_id=trace_id ) return False except Exception as e: await self.roll_back_content_status_when_fails( process_times=process_times + 1, trace_id=trace_id ) print("爬虫处理失败: {}".format(e)) return False async def etl_task(self, params): """ download && upload videos :param params: :return: """ trace_id = params['trace_id'] content_id = params['content_id'] process_times = params['process_times'] # 判断视频是否已下载完成 video_exist_flag = await whether_downloaded_videos_exists( content_id=content_id, article_crawler_video_table=self.article_crawler_video_table, db_client=self.long_articles_client ) if video_exist_flag: affect_rows = await self.update_content_status( ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS, trace_id=trace_id, new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS ) if affect_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False return True else: # 开始处理, 将文章状态修改为处理状态 affected_rows = await self.update_content_status( ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS, trace_id=trace_id, new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False # download videos illegal_videos = await self.get_illegal_out_ids(content_id) downloaded_count = await async_download_videos( trace_id=trace_id, content_id=content_id, article_crawler_video_table=self.article_crawler_video_table, db_client=self.long_articles_client, illegal_videos=illegal_videos ) if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM: await self.update_content_status( ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS, trace_id=trace_id, new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS ) return True else: await self.roll_back_content_status_when_fails( process_times=process_times + 1, trace_id=trace_id ) return False async def publish_task(self, params, kimi_title): """ 发布任务 :param kimi_title: :param params: :return: """ gh_id = params['gh_id'] flow_pool_level = params['flow_pool_level'] content_id = params['content_id'] trace_id = params['trace_id'] process_times = params['process_times'] # 开始处理,将状态修改为操作状态 affected_rows = await self.update_content_status( ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS, trace_id=trace_id, new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS ) if affected_rows == 0: logging( code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return" ) return False try: download_videos = await get_downloaded_videos( content_id=content_id, article_crawler_video_table=self.article_crawler_video_table, db_client=self.long_articles_client ) match flow_pool_level: case "autoArticlePoolLevel4": # 冷启层, 全量做 video_list = shuffle_list(download_videos)[:3] case "autoArticlePoolLevel3": if self.gh_id_dict.get(gh_id): video_list = shuffle_list(download_videos)[:3] else: video_list = download_videos[:3] case "autoArticlePoolLevel2": # 次条,只针对具体账号做 video_list = [] case "autoArticlePoolLevel1": # 头条,先不做 video_list = download_videos[:3] case _: video_list = download_videos[:3] # 将视频发布至票圈 await publish_videos_to_piaoquan( video_list=video_list, kimi_title=kimi_title, process_times=process_times, trace_id=trace_id, db_client=self.long_articles_client, article_match_video_table=self.article_match_video_table ) except Exception as e: await self.roll_back_content_status_when_fails( process_times=params['process_times'] + 1, trace_id=params['trace_id'] ) print(e) async def record_for_audit(self, content_id): """ 视频下载成功后,记录到audit表中 """ audit_account = chr(random.randint(97, 106)) insert_sql = f""" INSERT IGNORE INTO long_articles_title_audit (content_id, create_timestamp, audit_account) VALUES (%s, %s, %s); """ await self.long_articles_client.async_insert( sql=insert_sql, params=( content_id, int(time.time() * 1000), audit_account ) ) async def start_process(self, params): """ 处理单篇文章 :param params: :return: """ print("start process") kimi_result = await self.kimi_task(params) trace_id = params['trace_id'] process_times = params['process_times'] content_id = params['content_id'] publish_flag = params['publish_flag'] if kimi_result: # 等待 kimi 操作执行完成之后,开始执行 spider_task print("kimi success") logging( code=3001, info="kimi success", trace_id=trace_id ) spider_flag = await self.spider_task(params=params, kimi_result=kimi_result) if spider_flag: # 等待爬虫执行完成后,开始执行 etl_task print("spider success") logging( code=3002, info="spider_success", trace_id=trace_id ) etl_flag = await self.etl_task(params) if etl_flag: # 等待下载上传完成,执行发布任务 print("etl success") logging( code="3003", info="etl_success", trace_id=trace_id ) # ETL下载成功,记录审核 await self.record_for_audit(content_id) if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH: logging( code="3013", info="不需要发布,长文系统托管发布", trace_id=trace_id ) return else: try: await self.publish_task(params, kimi_result['kimi_title']) logging( code="3004", info="publish_success", trace_id=trace_id ) await record_trace_id( trace_id=trace_id, status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE ) except Exception as e: logging( code="6004", info="publish 失败--{}".format(e), trace_id=params['trace_id'] ) else: logging( code="6003", info="ETL 处理失败", trace_id=params['trace_id'] ) else: logging( code="6002", info="爬虫处理失败", trace_id=params['trace_id'] ) else: logging( code="6001", info="kimi 处理失败", trace_id=trace_id ) if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES: logging( code="6011", info="kimi处理次数达到上限, 放弃处理", trace_id=trace_id ) # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态 update_sql = f""" UPDATE {self.article_match_video_table} SET content_status = %s WHERE content_id = %s and content_status = %s; """ affected_rows = await self.long_articles_client.async_insert( sql=update_sql, params=( NewContentIdTaskConst.KIMI_ILLEGAL_STATUS, content_id, NewContentIdTaskConst.TASK_INIT_STATUS ) ) # 查询出该content_id所对应的标题 select_sql = f""" SELECT article_title FROM {self.article_text_table} WHERE content_id = '{content_id}'; """ result = await self.long_articles_client.async_select(select_sql) bot( title="KIMI 处理失败", detail={ "content_id": content_id, "affected_rows": affected_rows, "article_title": result[0][0] }, mention=False ) async def process_each_task(self, params): """ 处理任务 :return: """ print(json.dumps(params, ensure_ascii=False, indent=4)) content_id = params['content_id'] flow_pool_level = params['flow_pool_level'] download_videos_exists_flag = await whether_downloaded_videos_exists( content_id=content_id, article_crawler_video_table=self.article_crawler_video_table, db_client=self.long_articles_client ) print("开始处理") if not download_videos_exists_flag: processing_flag = await self.judge_whether_same_content_id_is_processing(content_id) if processing_flag: print("processing success") logging( code="9001", info="该 content id 正在处理中, 跳过此任务--{}".format(content_id) ) else: # 判断是否存在root_content_id if flow_pool_level != 'autoArticlePoolLevel4': # 如果查到根content_id, 采用根content_id的视频 root_content_id = await self.get_source_content_id(content_id) if root_content_id: # 传参新增root_content_id params['root_content_id'] = root_content_id # 开始处理 await self.start_process(params=params) else: print("存在已下载视频") async def deal(self) -> None: """ function :return: """ # 处理未托管的任务 await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH) # 处理托管任务 await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH) # 将处理次数大于3次且未成功的任务置为失败 await self.set_tasks_status_fail() # 获取task_list task_list = await self.get_tasks() task_dict = {task['content_id']: task for task in task_list} process_list = list(task_dict.values()) logging( code="5001", info="Match Task Got {} this time".format(len(process_list)), function="Publish Task" ) # 处理process_list if process_list: a = time.time() tasks = [self.process_each_task(params) for params in process_list] await asyncio.gather(*tasks) b = time.time() print("处理时间: {} s".format(b - a)) else: logging( code="9008", info="没有要处理的请求" )