""" @author: luojunhui """ import random import time from uuid import uuid4 from tqdm import tqdm from applications.aliyunLog import AliyunArticleLog from applications.functions import ArticleRank, title_sim_v2_by_list from applications.pipeline import LongArticlesPipeline def deduplication(rank1, rank2, rank3): """ 标题相似度去重 :return: """ dup_list = [] final_result = [] for item_list in [rank1, rank2, rank3]: result = [] if item_list: for item in item_list: title = item["title"] if title_sim_v2_by_list(title, dup_list): # print("标题重复,已经过滤\t", title) continue else: result.append(item) dup_list.append(title) final_result.append(result) return final_result[0], final_result[1], final_result[2] class AccountArticleRank(object): """ 文章排序 """ def __init__(self, params, mysql_client): """ :param params: 请求参数 :param mysql_client: 数据库链接池 """ self.filter_list = None self.publishArticleList = None self.publishNum = None self.strategy = None self.ghId = None self.accountName = None self.accountId = None self.params = params self.mysql_client = mysql_client self.request_id = "alg-{}-{}".format(uuid4(), int(time.time())) self.logger = AliyunArticleLog(request_id=self.request_id, alg="ArticleRank") self.pipeline = LongArticlesPipeline() def filter(self): """ 过滤器 """ self.publishArticleList = [] self.filter_list = [] print("历史") history_title_dict = self.pipeline.history_title( account_nickname=self.accountName ) print(history_title_dict) for item in tqdm(self.params["publishArticleList"]): flag = self.pipeline.deal(item, self.accountName, history_title_dict) if flag: item["filterReason"] = flag["filterReason"] self.filter_list.append(item) else: self.publishArticleList.append(item) print("过滤完成") async def check_params(self): """ 校验参数 :return: """ try: self.accountId = self.params["accountId"] self.accountName = self.params["accountName"] self.ghId = self.params["ghId"] self.strategy = self.params["strategy"] self.publishNum = self.params["publishNum"] print("开始校验参数") self.filter() print("参数校验成功") self.logger.log(code="1001", msg="参数校验成功", data=self.params) return None except Exception as e: response = { "msg": "params error", "info": "params check failed, params : {} is not correct".format(e), "code": 0, } self.logger.log( code="1002", msg="参数校验失败--{}".format(e), data=self.params ) return response async def basic_rank(self): """ 基础排序 :return: """ # 第一步把所有文章标题分为3组 article_list1_ori = [ i for i in self.publishArticleList if "【1】" in i["producePlanName"] ] article_list2_ori = [ i for i in self.publishArticleList if "【2】" in i["producePlanName"] ] article_list3_ori = [ i for i in self.publishArticleList if not i in article_list1_ori and not i in article_list2_ori ] # # 全局去重,保留优先级由 L1 --> L2 --> L3 # hash_map = {} # # article_list1 = [] # for i in article_list1_ori: # title = i['title'] # if hash_map.get(title): # continue # else: # article_list1.append(i) # hash_map[title] = 1 # # article_list2 = [] # for i in article_list2_ori: # title = i['title'] # if hash_map.get(title): # continue # else: # article_list2.append(i) # hash_map[title] = 2 # # article_list3 = [] # for i in article_list3_ori: # title = i['title'] # if hash_map.get(title): # continue # else: # article_list3.append(i) # hash_map[title] = 1 # 第二步对article_list1, article_list3按照得分排序, 对article_list2按照播放量排序 if article_list1_ori: rank1 = ArticleRank().rank( account_list=[self.accountName], text_list=[i["title"] for i in article_list1_ori], ) score_list1 = rank1[self.accountName]["score_list"] ranked_1 = [] for index, value in enumerate(score_list1): obj = article_list1_ori[index] obj["score"] = value + 1000 ranked_1.append(obj) ranked_1 = sorted(ranked_1, key=lambda x: x["score"], reverse=True) else: ranked_1 = [] # rank2 if article_list2_ori: for item in article_list2_ori: item["score"] = 100 ranked_2 = sorted( article_list2_ori, key=lambda x: x["crawlerViewCount"], reverse=True ) else: ranked_2 = [] # rank3 if article_list3_ori: rank3 = ArticleRank().rank( account_list=[self.accountName], text_list=[i["title"] for i in article_list3_ori], ) score_list3 = rank3[self.accountName]["score_list"] ranked_3 = [] for index, value in enumerate(score_list3): obj = article_list3_ori[index] obj["score"] = value ranked_3.append(obj) ranked_3 = sorted(ranked_3, key=lambda x: x["score"], reverse=True) else: ranked_3 = [] self.logger.log( code="1004", msg="排序完成", data={"rank1": ranked_1, "rank2": ranked_2, "rank3": ranked_3}, ) return ranked_1, ranked_2, ranked_3 async def rank_v1(self): """ Rank Version 1 :return: """ print("开始排序") try: ranked_1_d, ranked_2_d, ranked_3_d = await self.basic_rank() ranked_1, ranked_2, ranked_3 = deduplication( ranked_1_d, ranked_2_d, ranked_3_d ) print("去重成功") try: L = [] if ranked_1: target = random.choice(ranked_1[:5]) L.append(target) if ranked_2: L.append(ranked_2[0]) else: if ranked_2: if len(ranked_2) > 1: for i in ranked_2[:2]: L.append(i) else: L.append(ranked_2[0]) # L only 1 for item in ranked_3: L.append(item) # L 1 and 3 result = { "accountId": self.accountId, "accountName": self.accountName, "ghId": self.ghId, "strategy": self.strategy, "publishNum": self.publishNum, "rank_list": L[: self.publishNum], "filter_list": self.filter_list, } self.logger.log(code=1006, msg="rank successfully", data=result) response = {"status": "Rank Success", "data": result, "code": 1} except Exception as e: result = { "accountId": self.accountId, "accountName": self.accountName, "ghId": self.ghId, "strategy": self.strategy, "publishNum": self.publishNum, "rank_list": self.publishArticleList[: self.publishNum], "filter_list": self.filter_list, } self.logger.log( code=1007, msg="rank failed because of {}".format(e), data=result ) print("排序成功") response = {"status": "Rank Fail", "data": result, "code": 1} return response except: result = {"code": 2, "info": "account is not exist"} return result async def rank_v2(self): """ Rank Version 2 :return: """ try: ranks = ArticleRank().rank( account_list=[self.accountName], text_list=[i["title"] for i in self.publishArticleList], ) score_list1 = ranks[self.accountName]["score_list"] ranked_v2 = [] for index, value in enumerate(score_list1): obj = self.publishArticleList[index] obj["score"] = value ranked_v2.append(obj) ranked_v2 = sorted(ranked_v2, key=lambda x: (-x["score"], -x['crawlerViewCount'])) result = { "accountId": self.accountId, "accountName": self.accountName, "ghId": self.ghId, "strategy": self.strategy, "publishNum": self.publishNum, "rank_list": ranked_v2[: self.publishNum], "filter_list": self.filter_list, } response = {"status": "Rank Success", "data": result, "code": 1} except Exception as e: result = { "accountId": self.accountId, "accountName": self.accountName, "ghId": self.ghId, "strategy": self.strategy, "publishNum": self.publishNum, "rank_list": self.publishArticleList[: self.publishNum], "filter_list": self.filter_list, } response = {"status": "Rank Fail Because Of {}".format(e), "data": result, "code": 1} return response async def rank_v3(self): """ Rank Version 3 :return: """ return await self.rank_v1() async def rank_v4(self): """ Rank Version 4 :return: """ return await self.rank_v1() async def rank_v5(self): """ Rank Version 5 :return: """ return await self.rank_v1() async def choose_strategy(self): """ 选择排序策略 :return: """ match self.strategy: case "ArticleRankV1": self.logger.log(code="1003", msg="命中排序策略1") return await self.rank_v1() case "ArticleRankV2": self.logger.log(code="1003", msg="命中排序策略2") return await self.rank_v2() case "ArticleRankV3": self.logger.log(code="1003", msg="命中排序策略3") return await self.rank_v3() case "ArticleRankV4": self.logger.log(code="1003", msg="命中排序策略4") return await self.rank_v4() case "ArticleRankV5": self.logger.log(code="1003", msg="命中排序策略5") return await self.rank_v5() async def deal(self): """ Deal Function :return: """ error_params = await self.check_params() if error_params: return error_params else: print("参数校验成功") return await self.choose_strategy()