""" @author: luojunhui """ import json import time import asyncio from applications.config import Config from applications.functions.log import logging from applications.functions.pqFunctions import publishToPQ class historyContentIdTask(object): """ 处理已经匹配过小程序的文章 """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client self.article_text = Config().articleText self.article_video = Config().articleVideos self.article_crawler_video = Config().articleCrawlerVideos self.history_coroutines = Config().getConfigValue("historyArticleCoroutines") async def getTaskList(self): """ 获取任务 :return: """ select_sql1 = f""" SELECT ART.trace_id, ART.content_id, ART.flow_pool_level, ART.gh_id, ART.process_times FROM {self.article_video} ART JOIN ( select content_id, count(1) as cnt from {self.article_crawler_video} where download_status = 2 group by content_id ) VID on ART.content_id = VID.content_id and VID.cnt >= 3 WHERE ART.content_status = 0 and ART.process_times <= 3 ORDER BY request_timestamp LIMIT {self.history_coroutines}; """ tasks = await self.mysql_client.asyncSelect(sql=select_sql1) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "flow_pool_level": item[2], "gh_id": item[3], "process_times": item[4] } for item in tasks ] logging( code="9001", info="本次任务获取到 {} 条视频".format(len(task_obj_list)), data=task_obj_list ) return task_obj_list async def getVideoList(self, content_id): """ content_id :return: """ sql = f""" SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id FROM {self.article_crawler_video} WHERE content_id = '{content_id}' and download_status = 2; """ res_tuple = await self.mysql_client.asyncSelect(sql) if len(res_tuple) >= 3: return [ { "platform": i[0], "play_count": i[1], "like_count": i[2], "video_oss_path": i[3], "cover_oss_path": i[4], "uid": i[5] } for i in res_tuple] else: return [] async def getKimiTitle(self, content_id): """ 获取 kimiTitle :param content_id: :return: """ select_sql = f""" select kimi_title from {self.article_text} where content_id = '{content_id}'; """ res_tuple = await self.mysql_client.asyncSelect(select_sql) if res_tuple: return res_tuple[0][0] else: return False async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times): """ 发布至 pq :param process_times: :param trace_id: :param download_videos: 已下载的视频---> list [{}, {}, {}.... ] :param gh_id: 公众号 id ---> str :param kimi_title: kimi 标题 ---> str :param flow_pool_level: 流量池层级 ---> str :return: """ video_list = download_videos[:3] match flow_pool_level: case "autoArticlePoolLevel4": print("冷启层") video_list = [] case "autoArticlePoolLevel3": print("暂时未知层") video_list = [] case "autoArticlePoolLevel2": print("次条层") video_list = [] case "autoArticlePoolLevel1": print("头条层") video_list = [] L = [] for video_obj in video_list: params = { "videoPath": video_obj['video_oss_path'], "uid": video_obj['uid'], "title": kimi_title } response = await publishToPQ(params) # time.sleep(2) obj = { "uid": video_obj['uid'], "source": video_obj['platform'], "kimiTitle": kimi_title, "videoId": response['data']['id'], "videoCover": response['data']['shareImgPath'], "videoPath": response['data']['videoPath'], "videoOss": video_obj['video_oss_path'].split("/")[-1] } L.append(obj) update_sql = f""" UPDATE {self.article_video} SET content_status = %s, response = %s, process_times = %s WHERE trace_id = %s; """ await self.mysql_client.asyncInsert( sql=update_sql, params=(2, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id) ) async def processTask(self, params): """ 异步执行 :param params: :return: """ content_id = params['content_id'] trace_id = params['trace_id'] flow_pool_level = params['flow_pool_level'], gh_id = params['gh_id'] process_times = params['process_times'] # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点 download_videos = await self.getVideoList(content_id=content_id) if download_videos: # 把状态修改为 4 update_sql = f""" UPDATE {self.article_video} SET content_status = %s WHERE trace_id = %s; """ await self.mysql_client.asyncInsert( sql=update_sql, params=(4, trace_id) ) kimi_title = await self.getKimiTitle(content_id) if kimi_title: await self.publishVideosToPq( flow_pool_level=flow_pool_level, kimi_title=kimi_title, gh_id=gh_id, trace_id=trace_id, download_videos=download_videos, process_times=process_times ) else: print("Kimi title 生成失败---后续加报警") else: pass async def deal(self): """ 处理 :return: """ task_list = await self.getTaskList() if task_list: tasks = [self.processTask(params) for params in task_list] await asyncio.gather(*tasks) else: logging( code="9008", info="没有要处理的请求" )