Browse Source

新增头条搜索账号模式

luojunhui 1 month ago
parent
commit
9c548864f9

+ 3 - 0
applications/ab_test/__init__.py

@@ -1 +1,4 @@
 from .get_cover import GetCoverService
+
+
+__all__ = ["GetCoverService"]

+ 16 - 0
applications/api/__init__.py

@@ -27,3 +27,19 @@ from .async_aigc_system_api import auto_bind_crawler_task_to_generate_task
 feishu_robot = FeishuBotApi()
 feishu_sheet = FeishuSheetApi()
 task_apollo = AsyncApolloApi()
+
+__all__ = [
+    "feishu_robot",
+    "feishu_sheet",
+    "change_video_audit_status",
+    "publish_video_to_piaoquan",
+    "fetch_piaoquan_video_list_detail",
+    "AsyncApolloApi",
+    "task_apollo",
+    "fetch_deepseek_completion",
+    "log",
+    "delete_illegal_gzh_articles",
+    "auto_create_crawler_task",
+    "auto_bind_crawler_task_to_generate_task",
+    "AsyncElasticSearchClient",
+]

+ 13 - 0
applications/config/__init__.py

@@ -13,3 +13,16 @@ from .deepseek_config import deep_seek_official_api_key
 
 # es config
 from .elastic_search_mappings import es_index, es_mappings, es_settings
+
+__all__ = [
+    "aigc_db_config",
+    "long_video_db_config",
+    "long_articles_db_config",
+    "piaoquan_crawler_db_config",
+    "aliyun_log_config",
+    "deep_seek_official_model",
+    "deep_seek_official_api_key",
+    "es_index",
+    "es_mappings",
+    "es_settings",
+]

+ 3 - 0
applications/tasks/cold_start_tasks/__init__.py

@@ -1 +1,4 @@
 from .article_pool_cold_start import ArticlePoolColdStart
+
+
+__all__ = ["ArticlePoolColdStart"]

+ 2 - 0
applications/tasks/crawler_tasks/__init__.py

@@ -1 +1,3 @@
 from .crawler_toutiao import CrawlerToutiao
+
+__all__ = ["CrawlerToutiao"]

+ 8 - 6
applications/tasks/crawler_tasks/crawler_toutiao.py

@@ -195,17 +195,17 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
         }
         # get title_list
         response = await get_toutiao_account_info_list(
-            account_id=account_id,
-            cookie=cookie,
-            media_type="article"
+            account_id=account_id, cookie=cookie, media_type="article"
         )
         if not response:
             return
 
         article_raw_data = response["data"]
-        title_list = [i['title'] for i in article_raw_data]
+        title_list = [i["title"] for i in article_raw_data]
         new_account_item["title_list"] = json.dumps(title_list, ensure_ascii=False)
-        await self.save_item_to_database(media_type="account", item=new_account_item, trace_id=self.trace_id)
+        await self.save_item_to_database(
+            media_type="account", item=new_account_item, trace_id=self.trace_id
+        )
 
     async def crawler_each_article(self, method, article_raw_data, category=None):
         """
@@ -242,7 +242,9 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 }
             case _:
                 raise Exception(f"unknown method: {method}")
-        await self.save_item_to_database(media_type="article", item=new_article_item, trace_id=self.trace_id)
+        await self.save_item_to_database(
+            media_type="article", item=new_article_item, trace_id=self.trace_id
+        )
 
     async def crawler_each_video(self, video_raw_data):
         pass

+ 7 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -1,3 +1,10 @@
 from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
 from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
 from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
+
+
+__all__ = [
+    "RecycleDailyPublishArticlesTask",
+    "CheckDailyPublishArticlesTask",
+    "UpdateRootSourceIdAndUpdateTimeTask",
+]

+ 6 - 0
applications/tasks/llm_tasks/__init__.py

@@ -1 +1,7 @@
 from .process_title import TitleRewrite
+from .candidate_account_process import CandidateAccountQualityScoreRecognizer
+
+__all__ = [
+    "TitleRewrite",
+    "CandidateAccountQualityScoreRecognizer",
+]

+ 181 - 0
applications/tasks/llm_tasks/candidate_account_process.py

@@ -0,0 +1,181 @@
+import json
+import traceback
+from typing import List, Dict
+from tqdm import tqdm
+
+from applications.api import fetch_deepseek_completion
+
+
+class CandidateAccountProcessConst:
+
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
+    LACK_ARTICLE_STATUS = 11
+    TITLE_TOO_LONG_STATUS = 14
+
+    AVG_SCORE_THRESHOLD = 65
+
+    ARTICLE_COUNT_THRESHOLD = 13
+    AVG_TITLE_LENGTH_THRESHOLD = 45
+
+    @staticmethod
+    def generate_title_match_score_prompt(title_list):
+        title_list_string = "\n".join(title_list)
+        prompt = f"""
+** 任务指令 **
+    你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
+
+** 评估维度及权重 **
+    1. 受众精准度(50%)
+        正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
+        负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养/音乐/棋牌
+
+    2. 标题技法(40%)
+        悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
+        情感强度:使用"痛心!""寒心!"等情绪词
+        数据冲击:具体数字增强可信度(例:"存款180万消失")
+        口语化表达:使用"涨知识了""别不当回事"等日常用语
+
+    3. 内容调性(10%)
+        煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
+        警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
+        历史揭秘:人物秘闻/老照片故事
+        爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
+
+** 评分规则 **
+    90-100分:同时满足3个维度且要素齐全,无负向内容
+    70-89分:满足2个核心维度,无负向内容
+    50-69分:仅满足受众群体正向匹配,无负向内容
+    30-49分:存在轻微关联但要素缺失
+    0-29分:完全无关或包含任意负向品类内容
+
+** 待评估标题 **
+    {title_list_string}
+
+** 输出要求 **
+    输出结果为JSON,仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
+"""
+        return prompt
+
+
+class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
+    def __init__(self, pool, log_client, trace_id):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    async def get_task_list(self) -> List[Dict]:
+        """
+        get account tasks from the database
+        """
+        fetch_query = f"""
+            select id, title_list, platform 
+            from crawler_candidate_account_pool
+            where avg_score is null and status = {self.INIT_STATUS} and title_list is not null;
+        """
+        fetch_response = await self.pool.async_fetch(
+            fetch_query,
+        )
+        return fetch_response
+
+    async def update_account_status(
+        self, account_id: int, ori_status: int, new_status: int
+    ) -> int:
+        """update account status"""
+        update_query = f"""
+                update crawler_candidate_account_pool
+                set status = %s
+                where id = %s and status = %s;
+            """
+        return await self.pool.async_save(
+            update_query, (new_status, account_id, ori_status)
+        )
+
+    async def score_for_each_account_by_llm(self, account):
+        account_id = account["id"]
+        # lock
+        if not await self.update_account_status(
+            account_id, self.INIT_STATUS, self.PROCESSING_STATUS
+        ):
+            return
+
+        # start processing
+        title_list = json.loads(account["title_list"])
+        if (
+            len(title_list) < self.ARTICLE_COUNT_THRESHOLD
+            and account["platform"] == "toutiao"
+        ):
+            await self.update_account_status(
+                account_id, self.PROCESSING_STATUS, self.LACK_ARTICLE_STATUS
+            )
+            return
+
+        # 平均标题过长
+        avg_title_length = sum([len(title) for title in title_list]) / len(title_list)
+        if avg_title_length > self.AVG_TITLE_LENGTH_THRESHOLD:
+            await self.update_account_status(
+                account_id, self.PROCESSING_STATUS, self.TITLE_TOO_LONG_STATUS
+            )
+            return
+
+        prompt = self.generate_title_match_score_prompt(title_list)
+        try:
+            completion = fetch_deepseek_completion(
+                model="DeepSeek-V3", prompt=prompt, output_type="json"
+            )
+            avg_score = sum(completion) / len(completion)
+            query = f"""
+                update crawler_candidate_account_pool
+                set score_list = %s, avg_score = %s, status = %s
+                where id = %s and status = %s;
+            """
+            await self.pool.async_save(
+                query=query,
+                params=(
+                    json.dumps(completion),
+                    avg_score,
+                    self.PROCESSING_STATUS,
+                    account_id,
+                    self.SUCCESS_STATUS,
+                ),
+            )
+
+        except Exception as e:
+            await self.log_client.log(
+                contents={
+                    "task": "candidate_account_analysis",
+                    "trace_id": self.trace_id,
+                    "function": "score_for_each_account_by_llm",
+                    "message": "大模型识别账号失败",
+                    "status": "fail",
+                    "data": {
+                        "error": str(e),
+                        "title_list": json.dumps(title_list),
+                    },
+                }
+            )
+            await self.update_account_status(
+                account_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+
+    async def deal(self):
+        task_list = await self.get_task_list()
+        for task in tqdm(task_list, desc="use llm to analysis each account"):
+            try:
+                await self.score_for_each_account_by_llm(task)
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "candidate_account_analysis",
+                        "trace_id": self.trace_id,
+                        "function": "deal",
+                        "status": "fail",
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                            "task": task,
+                        },
+                    }
+                )

+ 10 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -5,3 +5,13 @@ from .gzh_article_monitor import OutsideGzhArticlesMonitor
 from .gzh_article_monitor import OutsideGzhArticlesCollector
 from .gzh_article_monitor import InnerGzhArticlesMonitor
 from .task_processing_monitor import TaskProcessingMonitor
+
+__all__ = [
+    "check_kimi_balance",
+    "GetOffVideos",
+    "CheckVideoAuditStatus",
+    "OutsideGzhArticlesMonitor",
+    "OutsideGzhArticlesCollector",
+    "InnerGzhArticlesMonitor",
+    "TaskProcessingMonitor",
+]

+ 12 - 2
applications/tasks/task_scheduler_v2.py

@@ -10,10 +10,11 @@ from applications.utils import task_schedule_response, generate_task_trace_id
 
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
-from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
 from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
 from applications.tasks.monitor_tasks import CheckVideoAuditStatus
@@ -75,7 +76,7 @@ class TaskScheduler(TaskMapper):
     async def _release_task(self, status: int) -> None:
         query = (
             f"update {self.table} set task_status=%s, finish_timestamp=%s "
-            "where trace_d=%s and task_status=%s;"
+            "where trace_id=%s and task_status=%s;"
         )
         await self.db_client.async_save(
             query=query,
@@ -190,6 +191,8 @@ class TaskScheduler(TaskMapper):
             "article_pool_cold_start": self._article_pool_cold_start_handler,
             # 任务超时监控
             "task_processing_monitor": self._task_processing_monitor_handler,
+            # 候选账号质量分析
+            "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
         }
 
         if task_name not in handlers:
@@ -277,3 +280,10 @@ class TaskScheduler(TaskMapper):
         crawler_methods = self.data.get("crawler_methods", [])
         await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
         return self.TASK_SUCCESS_STATUS
+
+    async def _candidate_account_quality_score_handler(self) -> int:
+        task = CandidateAccountQualityScoreRecognizer(
+            self.db_client, self.log_client, self.trace_id
+        )
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS