Ver código fonte

账号利用状态-强行等于 1

luojunhui 4 dias atrás
pai
commit
a51c570c1b

+ 1 - 0
applications/tasks/dev/__init__.py

@@ -1,2 +1,3 @@
 from .data_analysis import DataAnalysis
 from .get_account_category import GetAccountCategory
+from .cal_read_score import ReadScoreCalculator

+ 187 - 0
applications/tasks/dev/cal_read_score.py

@@ -0,0 +1,187 @@
+"""
+计算阅读分
+"""
+import hashlib
+import json
+import math
+
+from tqdm import tqdm
+
+
+class ReadScoreCalculator:
+    def __init__(self, db_client):
+        self.db_client = db_client
+        self.read_avg_map = {}
+        self.unsafe_md5_set = None
+
+    async def get_read_avg_map(self):
+        """
+        获取阅读分平均值映射表
+        """
+        query = """
+            select gh_id, position, read_avg_ci_upper
+            from account_avg_info_v3 where status = 1;
+        """
+        raw_data = await self.db_client.async_fetch(query=query, db_name="piaoquan_crawler")
+        read_avg_ci_map = {}
+        for item in raw_data:
+            key = f"{item['gh_id']}-{item['position']}"
+            value = item['read_avg_ci_upper']
+            read_avg_ci_map[key] = value
+
+        return read_avg_ci_map
+
+    async def get_unsafe_titles(self):
+        query = """
+            select title_md5 from article_unsafe_title where status = 1;
+        """
+        unsafe_title_md5s = await self.db_client.async_fetch(query=query)
+        return set([item['title_md5'] for item in unsafe_title_md5s])
+
+
+    async def calculate_content_read_detail(self, content_md5):
+        """
+        计算文章的阅读分
+        """
+        query = """
+            select ghId, ItemIndex, show_view_count
+            from official_articles_v2 where title_md5 = %s
+            and root_source_id_list is not null and root_source_id_list != '[]' and Type = 9
+            and status = 1;
+        """
+        raw_data = await self.db_client.async_fetch(query=query, db_name="piaoquan_crawler", params=(content_md5,))
+        total_read_count_first = 0
+        total_read_avg_first = 0
+
+        total_read_count_second = 0
+        total_read_avg_second = 0
+        for item in raw_data:
+            if item['ItemIndex'] == 1:
+                total_read_count_first += item['show_view_count']
+                key = f"{item['ghId']}-{item['ItemIndex']}"
+                total_read_avg_first += self.read_avg_map.get(key, 0)
+            elif item['ItemIndex'] == 2:
+                total_read_count_second += item['show_view_count']
+                key = f"{item['ghId']}-{item['ItemIndex']}"
+                total_read_avg_second += self.read_avg_map.get(key, 0)
+            else:
+                continue
+
+        return {
+            "read_count_first": total_read_count_first,
+            "read_avg_first": total_read_avg_first,
+            "read_count_second": total_read_count_second,
+            "read_avg_second": total_read_avg_second,
+        }
+
+    async def calculate_read_score(self, content, gh_id):
+        """
+        计算文章的阅读分
+        """
+        title_md5 = hashlib.md5(content.encode('utf-8')).hexdigest()
+        if title_md5 in self.unsafe_md5_set:
+            return None
+
+        account_read_avg_first = self.read_avg_map.get(f"{gh_id}-1", 0)
+        read_detail = await self.calculate_content_read_detail(title_md5)
+        total_read_count_first = read_detail["read_count_first"]
+        total_read_avg_first = read_detail["read_avg_first"]
+        total_read_count_second = read_detail["read_count_second"]
+        total_read_avg_second = read_detail["read_avg_second"]
+
+        big_rate_w = 1.0
+
+        if total_read_count_first:
+            read_rate = total_read_count_first / total_read_avg_first if total_read_avg_first else 0
+            total_read_avg = total_read_avg_first
+        elif total_read_count_second:
+            read_rate = total_read_count_second / total_read_avg_second if total_read_avg_second else 0
+            total_read_avg = total_read_avg_second
+            if account_read_avg_first >= 3000:
+                big_rate_w = 0.001
+        else:
+            read_rate = total_read_count_second / total_read_avg_second if total_read_avg_second else 0
+            total_read_avg = total_read_avg_second
+
+        sigmoid = lambda ci_total, slope, avg_pos: 1 / (1 + math.exp(-slope * (ci_total - avg_pos)))
+        weight = sigmoid(total_read_avg, 0.0002, account_read_avg_first)
+
+        if read_rate > 0:
+            if read_rate > 1 and big_rate_w < 1:
+                # 对大账号的过高评分进行抑制
+                score_value = weight * ((read_rate - 1) * big_rate_w + 1)
+            else:
+                score_value = weight * read_rate
+        else:
+            score_value = 0.0
+
+        return {
+            "score_without_weight": read_rate,
+            "score": score_value,
+            "weight": weight,
+            "total_read_avg": total_read_avg,
+            "account_read_avg_first": account_read_avg_first,
+        }
+
+    async def get_flow_pool_1_contents(self, account_id):
+        """
+        获取流量池文章
+        """
+        query = """
+            select t4.title
+            from publish_content t1
+            join produce_plan_exe_record t2 on t1.source_id = t2.plan_exe_id
+                     join produce_plan t3 on t2.plan_id = t3.id
+                    join crawler_content t4 on t1.crawler_channel_content_id = t4.channel_content_id
+            where t1.channel = 5 and t1.publish_account_id = %s
+            and t3.plan_tag = 'autoArticlePoolLevel1' and t1.status = 1
+            order by t1.id desc;
+        """
+        raw_data = await self.db_client.async_fetch(query=query, db_name="aigc", params=(account_id,))
+        return [item['title'] for item in raw_data]
+
+    async def get_flow_pool_1_contents_v2(self, account_id):
+        """
+        获取流量池文章
+        """
+        query = """
+            select title from publish_content_gzh_waiting
+            where publish_account_id = %s and status = 1 and content_pool_type = 'autoArticlePoolLevel1'
+        """
+        raw_data = await self.db_client.async_fetch(query=query, params=(account_id,))
+        return [item['title'] for item in raw_data]
+
+    async def deal(self, data):
+        self.unsafe_md5_set = await self.get_unsafe_titles()
+        account_id = data['account_id']
+        gh_id = data['gh_id']
+        self.read_avg_map = await self.get_read_avg_map()
+        titles = await self.get_flow_pool_1_contents_v2(account_id)
+        L = []
+        for title in tqdm(titles):
+            score = await self.calculate_read_score(title, gh_id)
+            if score is None:
+                print(f"title unsafe: {title}")
+                continue
+            else:
+                score['title'] = title
+                L.append(score)
+
+        L = sorted(L, key=lambda x: x['score'], reverse=True)
+        print(json.dumps(L, ensure_ascii=False, indent=4))
+
+
+
+
+
+        
+
+
+
+
+
+
+
+
+
+

+ 8 - 0
applications/tasks/task_handler.py

@@ -31,6 +31,8 @@ from applications.tasks.monitor_tasks import TaskProcessingMonitor
 
 from applications.tasks.dev import DataAnalysis
 from applications.tasks.dev import GetAccountCategory
+from applications.tasks.dev import ReadScoreCalculator
+
 
 from applications.tasks.task_mapper import TaskMapper
 
@@ -220,6 +222,12 @@ class TaskHandler(TaskMapper):
         return self.TASK_SUCCESS_STATUS
 
 
+    async def _cal_read_score_handler(self) -> int:
+        task = ReadScoreCalculator(self.db_client)
+        await task.deal(self.data)
+        return self.TASK_SUCCESS_STATUS
+
+
 
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -199,6 +199,8 @@ class TaskScheduler(TaskHandler):
             "data_analysis": self._data_analysis_handler,
             # 账号品类分析
             "get_account_category": self._get_account_category_handler,
+            # 计算阅读分
+            "cal_read_score": self._cal_read_score_handler,
         }
 
         if task_name not in handlers: