""" 计算阅读分 """ import hashlib import json import math from tqdm import tqdm class ReadScoreCalculator: def __init__(self, db_client): self.db_client = db_client self.read_avg_map = {} self.unsafe_md5_set = None async def get_read_avg_map(self): """ 获取阅读分平均值映射表 """ query = """ select gh_id, position, read_avg_ci_upper from account_avg_info_v3 where status = 1; """ raw_data = await self.db_client.async_fetch(query=query, db_name="piaoquan_crawler") read_avg_ci_map = {} for item in raw_data: key = f"{item['gh_id']}-{item['position']}" value = item['read_avg_ci_upper'] read_avg_ci_map[key] = value return read_avg_ci_map async def get_unsafe_titles(self): query = """ select title_md5 from article_unsafe_title where status = 1; """ unsafe_title_md5s = await self.db_client.async_fetch(query=query) return set([item['title_md5'] for item in unsafe_title_md5s]) async def calculate_content_read_detail(self, content_md5): """ 计算文章的阅读分 """ query = """ select ghId, ItemIndex, show_view_count from official_articles_v2 where title_md5 = %s and root_source_id_list is not null and root_source_id_list != '[]' and Type = 9 and status = 1; """ raw_data = await self.db_client.async_fetch(query=query, db_name="piaoquan_crawler", params=(content_md5,)) total_read_count_first = 0 total_read_avg_first = 0 total_read_count_second = 0 total_read_avg_second = 0 for item in raw_data: if item['ItemIndex'] == 1: total_read_count_first += item['show_view_count'] key = f"{item['ghId']}-{item['ItemIndex']}" total_read_avg_first += self.read_avg_map.get(key, 0) elif item['ItemIndex'] == 2: total_read_count_second += item['show_view_count'] key = f"{item['ghId']}-{item['ItemIndex']}" total_read_avg_second += self.read_avg_map.get(key, 0) else: continue return { "read_count_first": total_read_count_first, "read_avg_first": total_read_avg_first, "read_count_second": total_read_count_second, "read_avg_second": total_read_avg_second, } async def calculate_read_score(self, content, gh_id): """ 计算文章的阅读分 """ title_md5 = hashlib.md5(content.encode('utf-8')).hexdigest() if title_md5 in self.unsafe_md5_set: return None account_read_avg_first = self.read_avg_map.get(f"{gh_id}-1", 0) read_detail = await self.calculate_content_read_detail(title_md5) total_read_count_first = read_detail["read_count_first"] total_read_avg_first = read_detail["read_avg_first"] total_read_count_second = read_detail["read_count_second"] total_read_avg_second = read_detail["read_avg_second"] big_rate_w = 1.0 if total_read_count_first: read_rate = total_read_count_first / total_read_avg_first if total_read_avg_first else 0 total_read_avg = total_read_avg_first elif total_read_count_second: read_rate = total_read_count_second / total_read_avg_second if total_read_avg_second else 0 total_read_avg = total_read_avg_second if account_read_avg_first >= 3000: big_rate_w = 0.001 else: read_rate = total_read_count_second / total_read_avg_second if total_read_avg_second else 0 total_read_avg = total_read_avg_second sigmoid = lambda ci_total, slope, avg_pos: 1 / (1 + math.exp(-slope * (ci_total - avg_pos))) weight = sigmoid(total_read_avg, 0.0002, account_read_avg_first) if read_rate > 0: if read_rate > 1 and big_rate_w < 1: # 对大账号的过高评分进行抑制 score_value = weight * ((read_rate - 1) * big_rate_w + 1) else: score_value = weight * read_rate else: score_value = 0.0 return { "score_without_weight": read_rate, "score": score_value, "weight": weight, "total_read_avg": total_read_avg, "account_read_avg_first": account_read_avg_first, } async def get_flow_pool_1_contents(self, account_id): """ 获取流量池文章 """ query = """ select t4.title from publish_content t1 join produce_plan_exe_record t2 on t1.source_id = t2.plan_exe_id join produce_plan t3 on t2.plan_id = t3.id join crawler_content t4 on t1.crawler_channel_content_id = t4.channel_content_id where t1.channel = 5 and t1.publish_account_id = %s and t3.plan_tag = 'autoArticlePoolLevel1' and t1.status = 1 order by t1.id desc; """ raw_data = await self.db_client.async_fetch(query=query, db_name="aigc", params=(account_id,)) return [item['title'] for item in raw_data] async def get_flow_pool_1_contents_v2(self, account_id): """ 获取流量池文章 """ query = """ select title from publish_content_gzh_waiting where publish_account_id = %s and status = 1 and content_pool_type = 'autoArticlePoolLevel1' """ raw_data = await self.db_client.async_fetch(query=query, params=(account_id,)) return [item['title'] for item in raw_data] async def deal(self, data): self.unsafe_md5_set = await self.get_unsafe_titles() account_id = data['account_id'] gh_id = data['gh_id'] self.read_avg_map = await self.get_read_avg_map() titles = await self.get_flow_pool_1_contents_v2(account_id) L = [] for title in tqdm(titles): score = await self.calculate_read_score(title, gh_id) if score is None: print(f"title unsafe: {title}") continue else: score['title'] = title L.append(score) L = sorted(L, key=lambda x: x['score'], reverse=True) print(json.dumps(L, ensure_ascii=False, indent=4))