| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 | """@author: luojunhui"""import jsonimport timeimport asynciofrom applications.config import Configfrom applications.log import loggingfrom applications.functions.pqFunctions import publish_to_pq, get_pq_video_detailfrom applications.functions.common import shuffle_listfrom applications.match_algorithm.rank import get_content_oss_fission_dictclass historyContentIdTask(object):    """    处理已经匹配过小程序的文章    """    TASK_PROCESSING_STATUS = 101    TASK_INIT_STATUS = 0    TASK_PUBLISHED_STATUS = 4    def __init__(self, mysql_client):        """        :param mysql_client:        """        self.mysql_client = mysql_client        self.config = Config(env="dev")        self.article_match_video_table = self.config.article_match_video_table        self.article_text_table = self.config.article_text_table        self.article_crawler_video_table = self.config.article_crawler_video_table        self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))        self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")    async def get_tasks(self):        """        获取任务        :return:        """        select_sql1 = f"""            SELECT                 ART.trace_id,                 ART.content_id,                 ART.flow_pool_level,                 ART.gh_id,                ART.process_times            FROM {self.article_match_video_table} ART            JOIN (                select content_id, count(1) as cnt                 from {self.article_crawler_video_table}                where download_status = 2                group by content_id            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3            WHERE ART.content_status = 0 and ART.process_times <= 3            ORDER BY request_timestamp            LIMIT {self.history_coroutines};        """        tasks = await self.mysql_client.async_select(sql=select_sql1)        task_obj_list = [            {                "trace_id": item[0],                "content_id": item[1],                "flow_pool_level": item[2],                "gh_id": item[3],                "process_times": item[4]            } for item in tasks        ]        logging(            code="9001",            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),            data=task_obj_list        )        return task_obj_list    async def get_video_list(self, content_id) -> list[dict]:        """        content_id        :return:        """        sql = f"""        SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id        FROM {self.article_crawler_video_table}        WHERE content_id = '{content_id}' and download_status = 2        ORDER BY score DESC;        """        res_tuple = await self.mysql_client.async_select(sql)        fission_dict = await get_content_oss_fission_dict(            db_client=self.mysql_client,            config=self.config,            content_id=content_id        )        if len(res_tuple) >= 3:            return [                {                    "platform": i[0],                    "play_count": i[1],                    "like_count": i[2],                    "video_oss_path": i[3],                    "cover_oss_path": i[4],                    "uid": i[5],                    "fission_0_rate": fission_dict.get(i[3], {}).get("fission_0_rate", 0),                    "fission_0_on_read": fission_dict.get(i[3], {}).get("fission_0_on_read", 0)                }                for i in res_tuple            ]        else:            return []    async def get_kimi_title(self, content_id):        """        获取 kimiTitle        :param content_id:        :return:        """        select_sql = f"""        select kimi_title from {self.article_text_table} where content_id = '{content_id}';        """        res_tuple = await self.mysql_client.async_select(select_sql)        if res_tuple:            return res_tuple[0][0]        else:            return False    async def update_content_status(self, new_content_status, trace_id, ori_content_status):        """        :param new_content_status:        :param trace_id:        :param ori_content_status:        :return:        """        update_sql = f"""                    UPDATE {self.article_match_video_table}                    SET content_status = %s, content_status_update_time = %s                    WHERE trace_id = %s and content_status = %s;                    """        row_counts = await self.mysql_client.async_insert(            sql=update_sql,            params=(                new_content_status,                int(time.time()),                trace_id,                ori_content_status            )        )        return row_counts    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):        """        发布至 pq        :param process_times:        :param trace_id:        :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]        :param gh_id: 公众号 id ---> str        :param kimi_title: kimi 标题 ---> str        :param flow_pool_level: 流量池层级 ---> str        :return:        """        match flow_pool_level:            case "autoArticlePoolLevel4":                # 冷启层, 全量做                video_list = shuffle_list(download_videos)[:3]            case "autoArticlePoolLevel3":                # 次条,只针对具体账号做                if self.gh_id_dict.get(gh_id):                    video_list = shuffle_list(download_videos)[:3]                else:                    video_list = download_videos[:3]            case "autoArticlePoolLevel2":                video_list = []            case "autoArticlePoolLevel1":                # 头条内容,使用重排后结果                sorted_videos = sorted(download_videos, key=lambda x: x['fission_0_rate'], reverse=True)                video_list = sorted_videos[:3]            case _:                print("未传流量池信息")                video_list = download_videos[:3]        L = []        for video_obj in video_list:            params = {                "videoPath": video_obj['video_oss_path'],                "uid": video_obj['uid'],                "title": kimi_title            }            publish_response = await publish_to_pq(params)            video_id = publish_response['data']['id']            response = await get_pq_video_detail(video_id)            # time.sleep(2)            obj = {                "uid": video_obj['uid'],                "source": video_obj['platform'],                "kimiTitle": kimi_title,                "videoId": response['data'][0]['id'],                "videoCover": response['data'][0]['shareImgPath'],                "videoPath": response['data'][0]['videoPath'],                "videoOss": video_obj['video_oss_path']            }            L.append(obj)        update_sql = f"""           UPDATE {self.article_match_video_table}           SET content_status = %s, response = %s, process_times = %s           WHERE trace_id = %s and content_status = %s;           """        await self.mysql_client.async_insert(            sql=update_sql,            params=(                self.TASK_PUBLISHED_STATUS,                json.dumps(L, ensure_ascii=False),                process_times + 1,                trace_id,                self.TASK_PROCESSING_STATUS            )        )        logging(            code="9002",            info="已经从历史文章更新",            trace_id=trace_id        )    async def roll_back_content_status_when_fails(self, process_times, trace_id):        """        处理失败,回滚至初始状态,处理次数加 1        :param process_times:        :param trace_id:        :return:        """        update_article_sql = f"""                            UPDATE {self.article_match_video_table}                            SET                                content_status = %s,                                 content_status_update_time = %s,                                process_times = %s                            WHERE trace_id = %s and content_status = %s;                        """        await self.mysql_client.async_insert(            sql=update_article_sql,            params=(                self.TASK_INIT_STATUS,                int(time.time()),                process_times + 1,                trace_id,                self.TASK_PROCESSING_STATUS            )        )    async def process_task(self, params):        """        异步执行        :param params:        :return:        """        content_id = params['content_id']        trace_id = params['trace_id']        flow_pool_level = params['flow_pool_level']        gh_id = params['gh_id']        process_times = params['process_times']        download_videos = await self.get_video_list(content_id=content_id)        # time.sleep(3)        if download_videos:            # 修改状态为执行状态,获取该任务的锁            affected_rows = await self.update_content_status(                trace_id=trace_id,                new_content_status=self.TASK_PROCESSING_STATUS,                ori_content_status=self.TASK_INIT_STATUS            )            if affected_rows == 0:                print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")                return            try:                kimi_title = await self.get_kimi_title(content_id)                await self.publish_videos_to_pq(                    flow_pool_level=flow_pool_level,                    kimi_title=kimi_title,                    gh_id=gh_id,                    trace_id=trace_id,                    download_videos=download_videos,                    process_times=process_times                )            except Exception as e:                logging(                    code="5003",                    info="history task 在发布的时候出现异常, error = {}".format(e),                    trace_id=trace_id                )            await self.roll_back_content_status_when_fails(                trace_id=trace_id,                process_times=process_times            )        else:            return    async def deal(self):        """        处理        :return:        """        task_list = await self.get_tasks()        logging(            code="5002",            info="History content_task Task Got {} this time".format(len(task_list)),            function="History Contents Task"        )        if task_list:            a = time.time()            tasks = [self.process_task(params) for params in task_list]            await asyncio.gather(*tasks)            b = time.time()            print("{} s 内处理了{}个任务".format(b - a, len(task_list)))        else:            print("暂时未获得历史已存在文章")
 |