""" @author: luojunhui 发布到 pq 获取视频 id """ import asyncio import json import time from applications.config import Config from applications.log import logging from applications.functions.pqFunctions import publishToPQ from applications.functions.common import shuffleList class publishTask(object): """ 在 match_videos 表中, 获取 content_status = 1 的 content_id 用 content_id 在 crawler_videos 表, 查询 download_status为 2 的视频,表示该 content_id 已经匹配完的视频 通过 流量池tag 逻辑 把 crawler_videos 中的视频路径发布至 pq, 获得 videoId match_videos表将 content_status 修改为 2,response中记录 videoId && ossName等信息 """ def __init__(self, mysql_client): self.mysql_client = mysql_client self.article_video = Config().articleVideos self.article_text = Config().articleText self.article_crawler_video = Config().articleCrawlerVideos self.gh_id_dict = json.loads(Config().getConfigValue("testAccountLevel2")) async def getTasks(self): """ 获取 task :return: """ select_sql = f""" SELECT trace_id, content_id, flow_pool_level, gh_id FROM {self.article_video} WHERE content_status = 1 limit 10; """ tasks = await self.mysql_client.asyncSelect(select_sql) if tasks: return [ { "trace_id": i[0], "content_id": i[1], "flow_pool_level": i[2], "gh_id": i[3] } for i in tasks ] else: return [] async def getVideoList(self, content_id): """ content_id :return: """ sql = f""" SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id FROM {self.article_crawler_video} WHERE content_id = '{content_id}' and download_status = 2; """ res_tuple = await self.mysql_client.asyncSelect(sql) if len(res_tuple) >= 3: return [ { "platform": i[0], "play_count": i[1], "like_count": i[2], "video_oss_path": i[3], "cover_oss_path": i[4], "uid": i[5] } for i in res_tuple] else: return [] async def getKimiTitle(self, content_id): """ 获取 kimiTitle :param content_id: :return: """ select_sql = f""" select kimi_title from {self.article_text} where content_id = '{content_id}'; """ res_tuple = await self.mysql_client.asyncSelect(select_sql) if res_tuple: return res_tuple[0][0] else: return False async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos): """ 发布至 pq :param trace_id: :param download_videos: 已下载的视频---> list [{}, {}, {}.... ] :param gh_id: 公众号 id ---> str :param kimi_title: kimi 标题 ---> str :param flow_pool_level: 流量池层级 ---> str :return: """ # video_list = download_videos[:3] match flow_pool_level: case "autoArticlePoolLevel4": # 冷启层, 全量做 video_list = shuffleList(download_videos)[:3] case "autoArticlePoolLevel3": if self.gh_id_dict.get(gh_id): video_list = shuffleList(download_videos)[:3] else: video_list = download_videos[:3] case "autoArticlePoolLevel2": # 次条,只针对具体账号做 video_list = [] case "autoArticlePoolLevel1": # 头条,先不做 video_list = download_videos[:3] case _: print("未传流量池信息") video_list = download_videos[:3] L = [] for video_obj in video_list: params = { "videoPath": video_obj['video_oss_path'], "uid": video_obj['uid'], "title": kimi_title } response = await publishToPQ(params) time.sleep(2) obj = { "uid": video_obj['uid'], "source": video_obj['platform'], "kimiTitle": kimi_title, "videoId": response['data']['id'], "videoCover": response['data']['shareImgPath'], "videoPath": response['data']['videoPath'], "videoOss": video_obj['video_oss_path'].split("/")[-1] } L.append(obj) update_sql = f""" UPDATE {self.article_video} SET content_status = %s, response = %s WHERE trace_id = %s; """ await self.mysql_client.asyncInsert( sql=update_sql, params=(2, json.dumps(L, ensure_ascii=False), trace_id) ) async def processTask(self, params): """ 处理任务 :return: """ gh_id = params['gh_id'] flow_pool_level = params['flow_pool_level'] content_id = params['content_id'] trace_id = params['trace_id'] download_videos = await self.getVideoList(content_id) if download_videos: kimi_title = await self.getKimiTitle(content_id) if kimi_title: await self.publishVideosToPq( flow_pool_level=flow_pool_level, kimi_title=kimi_title, gh_id=gh_id, trace_id=trace_id, download_videos=download_videos ) else: print("Kimi title 生成失败---后续加报警") else: print("该 content_id还未下载完成") async def deal(self): """ function :return: """ task_list = await self.getTasks() logging( code="5004", info="PublishTask Got {} this time".format(len(task_list)), function="Publish Task" ) if task_list: tasks = [self.processTask(params) for params in task_list] await asyncio.gather(*tasks) else: logging( code="9008", info="没有要处理的请求" )