Browse Source

Merge branch 'feature/luojunhui/2025-08-04-add-title-process-task' of Server/LongArticleTaskServer into master

luojunhui 1 month ago
parent
commit
167af2575c

+ 1 - 1
applications/tasks/cold_start_tasks/article_pool_cold_start.py

@@ -209,7 +209,7 @@ class ArticlePoolColdStart(ArticlePoolColdStartConst):
             detail={
                 "渠道": crawl_method,
                 "总文章数量": total_length,
-                "相关性分数过滤": filter_df.shape[0],
+                "相关性分数过滤剩余": filter_df.shape[0],
             },
             mention=False,
         )

+ 2 - 1
applications/tasks/crawler_tasks/__init__.py

@@ -1,3 +1,4 @@
 from .crawler_toutiao import CrawlerToutiao
+from .crawler_account_manager import WeixinAccountManager
 
-__all__ = ["CrawlerToutiao"]
+__all__ = ["CrawlerToutiao", "WeixinAccountManager"]

+ 149 - 0
applications/tasks/crawler_tasks/crawler_account_manager.py

@@ -0,0 +1,149 @@
+import time
+import math
+
+from datetime import datetime
+from typing import Optional, List, Dict
+from pandas import DataFrame
+from scipy import stats
+from tqdm.asyncio import tqdm
+
+
+class CrawlerAccountManagerConst:
+    # 文章状态
+    ARTICLE_LOW_SIMILARITY_STATUS = 0
+    ARTICLE_INIT_STATUS = 1
+    ARTICLE_PUBLISHED_STATUS = 2
+
+    # 相似度阈值
+    SIMILARITY_THRESHOLD = 0.4
+
+    # 1天时间戳
+    ONE_DAY_TIMESTAMP = 86400
+    # 近30天时间戳
+    THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP
+
+
+class CrawlerAccountManager(CrawlerAccountManagerConst):
+    def __init__(self, pool, aliyun_log, trace_id):
+        self.pool = pool
+        self.aliyun_log = aliyun_log
+        self.trace_id = trace_id
+
+    @staticmethod
+    def safe_float(x: float | None, default: float = 0.0) -> float:
+        """把 None / NaN / Inf 统一替换成指定默认值"""
+        return default if x is None or not math.isfinite(x) else float(x)
+
+    async def get_crawling_accounts(self, platform) -> List[str]:
+        """获取抓取账号信息"""
+        match platform:
+            case "weixin":
+                query = f"""
+                    select gh_id as 'account_id' from long_articles_accounts where is_using = 1;
+                """
+            case _:
+                raise RuntimeError(f"Unknown platform: {platform}")
+
+        account_list = await self.pool.async_fetch(query)
+        return [i["account_id"] for i in account_list]
+
+
+class WeixinAccountManager(CrawlerAccountManager):
+
+    def __init__(self, pool, aliyun_log, trace_id):
+        super().__init__(pool, aliyun_log, trace_id)
+        self.pool = pool
+        self.aliyun_log = aliyun_log
+        self.trace_id = trace_id
+
+    async def get_account_crawler_articles_info(
+        self, account_id: str
+    ) -> Optional[DataFrame]:
+        """get articles and set as dataframe"""
+        query = f"""
+            select title, link, score, status, article_index, read_cnt, publish_time
+            from crawler_meta_article where out_account_id = %s;
+        """
+        response = await self.pool.async_fetch(query=query, params=(account_id,))
+        return DataFrame(
+            response,
+            columns=[
+                "title",
+                "link",
+                "score",
+                "status",
+                "article_index",
+                "read_cnt",
+                "publish_time",
+            ],
+        )
+
+    async def update_account_stat_detail(
+        self, account_id: str, history_info: Dict, recently_info: Dict
+    ) -> int:
+        """更新账号统计详情"""
+        query = f"""
+            update long_articles_accounts 
+            set history_publish_frequency = %s, history_score_ci_lower = %s, 
+                recent_publish_frequency= %s, recent_score_ci_lower = %s, update_date = %s
+            where gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                history_info["publish_frequency"],
+                history_info["ci_lower"],
+                recently_info["publish_frequency"],
+                recently_info["ci_lower"],
+                datetime.today().strftime("%Y-%m-%d"),
+                account_id,
+            ),
+        )
+
+    def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]:
+        score_list = dataframe["score"].dropna()
+        n = len(score_list)
+        mean = score_list.mean() if n else 0.0
+        # 置信区间
+        if n < 2:
+            ci_lower, ci_upper = 0.0, 0.0
+        else:
+            sem = stats.sem(score_list)  # 可能返回 NaN
+            t_val = stats.t.ppf(0.975, df=n - 1)
+            margin = t_val * sem if math.isfinite(sem) else 0.0
+            ci_lower, ci_upper = mean - margin, mean + margin
+
+        # 计算发文频率
+        publish_times = dataframe["publish_time"].dropna()
+        if len(publish_times) >= 2:
+            delta = publish_times.max() - publish_times.min()
+            publish_frequency = (
+                (len(publish_times) / delta * self.ONE_DAY_TIMESTAMP) if delta else 0.0
+            )
+        else:
+            publish_frequency = 0.0
+
+        return {
+            "publish_frequency": self.safe_float(publish_frequency),
+            "ci_lower": self.safe_float(ci_lower),
+            "ci_upper": self.safe_float(ci_upper),
+        }
+
+    async def analysis_single_account(self, account_id: str) -> int:
+        dataframe = await self.get_account_crawler_articles_info(account_id)
+        history_articles_analysis = self.analysis_dataframe(dataframe)
+        thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP
+        recent_30_days_df = dataframe[dataframe["publish_time"] >= thirty_days_before]
+        recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df)
+        return await self.update_account_stat_detail(
+            account_id, history_articles_analysis, recent_30_days_analysis
+        )
+
+    async def deal(self, platform, account_id_list: Optional[List[str]] = None) -> None:
+        """deal"""
+        if not account_id_list:
+            account_id_list = await self.get_crawling_accounts(platform=platform)
+
+        for account_id in tqdm(account_id_list):
+            await self.analysis_single_account(account_id)
+

+ 2 - 0
applications/tasks/llm_tasks/__init__.py

@@ -1,7 +1,9 @@
 from .process_title import TitleRewrite
+from .process_title import ArticlePoolCategoryGeneration
 from .candidate_account_process import CandidateAccountQualityScoreRecognizer
 
 __all__ = [
     "TitleRewrite",
     "CandidateAccountQualityScoreRecognizer",
+    "ArticlePoolCategoryGeneration",
 ]

+ 15 - 5
applications/tasks/llm_tasks/candidate_account_process.py

@@ -96,7 +96,9 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
             update_query, (new_status, account_id, ori_status)
         )
 
-    async def insert_account_into_crawler_queue(self, score_list: List[int], account: dict) -> None:
+    async def insert_account_into_crawler_queue(
+        self, score_list: List[int], account: dict
+    ) -> None:
         """
         计算账号的得分置信区间下限,若置信区间下限的分数大于阈值,则认为是好的账号
         """
@@ -107,7 +109,13 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
             """
             await self.pool.async_save(
                 query=query,
-                params=(account["platform"], account["account_id"], account["account_name"], 'ai_recognize', self.ACCOUNT_GOOD_STATUS)
+                params=(
+                    account["platform"],
+                    account["account_id"],
+                    account["account_name"],
+                    "ai_recognize",
+                    self.ACCOUNT_GOOD_STATUS,
+                ),
             )
 
     async def score_for_each_account_by_llm(self, account):
@@ -153,13 +161,15 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                 params=(
                     json.dumps(completion),
                     avg_score,
-                    self.PROCESSING_STATUS,
-                    account_id,
                     self.SUCCESS_STATUS,
+                    account_id,
+                    self.PROCESSING_STATUS,
                 ),
             )
             # 判断置信区间下限, 并且插入账号
-            await self.insert_account_into_crawler_queue(score_list=completion, account=account)
+            await self.insert_account_into_crawler_queue(
+                score_list=completion, account=account
+            )
 
         except Exception as e:
             await self.log_client.log(

+ 440 - 93
applications/tasks/llm_tasks/process_title.py

@@ -1,9 +1,10 @@
 import time
 import traceback
 
-from tqdm import tqdm
+from typing import Optional, List, Dict, Tuple
 
 from applications.api import fetch_deepseek_completion
+from applications.utils import yield_batch
 
 
 class Const:
@@ -12,109 +13,465 @@ class Const:
     TITLE_REWRITE_SUCCESS_STATUS = 1
     TITLE_REWRITE_FAIL_STATUS = 99
     TITLE_REWRITE_LOCK_STATUS = 101
-
     # article status
     ARTICLE_AUDIT_PASSED_STATUS = 1
     ARTICLE_POSITIVE_STATUS = 0
-
     # title useful status
     TITLE_USEFUL_STATUS = 1
-
     # prompt version
     PROMPT_VERSION = "xx_250228"  # 信欣2025-02-28提供
-
     # block expire time 1h
     TITLE_REWRITE_LOCK_TIME = 60 * 60
+    # task status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+    # max processing time
+    MAX_PROCESSING_TIME = 3600
+    # article_status
+    ARTICLE_INIT_STATUS = 1
+    ARTICLE_PUBLISHED_STATUS = 2
+    ARTICLE_BAD_STATUS = 0
+    # limit score
+    LIMIT_SCORE = 0.4
+
+    BATCH_SIZE = 20
+
+    PROCESS_NUM = 1000
 
 
 class TitleProcess(Const):
-    def __init__(self, pool, aliyun_log):
+    def __init__(self, pool, aliyun_log, trace_id):
         self.pool = pool
         self.aliyun_log = aliyun_log
+        self.trace_id = trace_id
 
-    @classmethod
-    def generate_title_rewrite_prompt(cls, ori_title):
+    @staticmethod
+    def generate_title_rewrite_prompt(ori_title: str) -> str:
         """
         生成prompt
         """
         prompt = f"""
-        请将以下标题改写成适合公众号中小程序点击和传播的文章标题,文章标题的写作规范如下,请学习后进行文章标题的编写。直接输出最终的文章标题,文章标题撰写规范如下:
-        1. 标题结构:要点前置,信息明确
-            核心信息前置:标题开头直接点出文章的核心内容或亮点,吸引读者注意。例如:
-              “我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗?”
-              “亩产7000斤,被误认成萝卜却曾是‘救命粮’,如今成我国出口名蔬”。
-            简洁明了:标题通常在20字以内,信息集中且易于理解。
-            悬念前置结构:前半句设置反常/冲突场景(如"刑满释放蹬三轮")+后半句用结果反转制造悬念("政府领导登门分配工作")
-            多要素拼接:通过冒号/逗号分隔不同叙事主体(地域+人物冲突+权威评价),如"辽宁女子住高档小区被敲门,法院判决意外"
-
-        2. 情绪表达:激发共鸣,引发好奇
-            情感共鸣:通过情感化的语言触动读者,泪崩/守护/抱头痛哭等情感冲击词,配合家庭伦理场景
-            例如:
-              “老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了”。
-              “儿子卖车卖房给母亲治病,母亲去世后儿媳收拾房间,打开床底柜,儿子突然痛哭”。
-            悬念与好奇心:通过提问或制造悬念,激发读者点击欲望。例如:
-              “你知道是哪五家吗?”
-              “打开床底柜,儿子突然痛哭”。
-            冲突性情绪词:拍桌大骂/气愤不已/眼红不已/算计等强对抗性词汇
-            结果反差刺激:用"风光善终/价值过亿/判决意外"等违反预期的结果
-
-        3. 语言风格:口语化、接地气
-            口语化表达:使用通俗易懂的语言,贴近读者生活。
-            刻意使用"赶都赶不走/各吃各的/我就知道你在家"等市井化用语。
-            例如:
-              “狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石”。
-              “聪明的女人,不会帮婆家3种忙,而蠢女人才一再插手”。
-            接地气的词汇:使用“狗屎运”“蠢女人”等口语化词汇,增强亲切感。
-            身份反差构建:突出人物命运转折(老农→亿万富翁/囚犯→政府帮扶对象)
-            权威背书暗示:"专家气愤/法院判决/网友评价"等第三方视角增强可信度
-
-        4. 标点运用:增强语气,突出重点
-            问号与感叹号:通过问号制造悬念,感叹号强化情感。
-            在关键转折点使用("太气人了!/赔不了!")
-            问号制造互动:如"容嬷嬷是校花?"激发读者验证心理
-            例如:
-              “你知道是哪五家吗?”
-              “太无耻了!湖南,一名厨师被公司派到云南‘出差’被拒……”
-            引号与冒号:用于突出关键词或转折点。
-            破折号递进:用"——"引导关键信息("吃不完最好扔掉——")
-            例如:
-              “被误认成萝卜却曾是‘救命粮’”。
-              “女子归还后,失主拒绝支付报酬,还说:要有格局”。
-
-        5. 热点与话题性:结合社会热点或争议
-            社会热点:结合当前热点事件或争议话题,吸引关注。例如:
-              “上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了!”
-            争议性话题:通过争议性内容引发讨论。例如:
-              “李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔”。
-
-        6. 数字与具体细节:增强可信度与吸引力
-            数字的运用:通过具体数字增强标题的可信度和吸引力。例如:
-              “亩产7000斤”。
-              “22年河南男子跳河救人,体力耗尽留遗言”。
-            细节描述:通过细节让标题更具画面感。例如:
-              “打开床底柜,儿子突然痛哭”。
-              “扒开后捡到鸡蛋大小的青鱼石”。
-
-        7. 价值诉求:传递实用信息或情感价值
-            实用信息:提供对读者有价值的信息。例如:
-              “我国存款最安全的五大银行,永远都不会倒闭”。
-              “72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花”。
-            情感价值:通过情感故事或人生哲理打动读者。例如:
-              “父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待”。
-
-        8. 名人效应与历史情怀:增强吸引力
-            名人效应:提及名人或历史事件,吸引关注。例如:
-              “难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美”。
-              “1975年‘下馆子’的老照片,2元能吃些什么,勾起那段最难忘的时光”。
-
-        9.隐藏传播逻辑:通过标题中暗含的、能触发人性弱点(如猎奇、贪婪、同情)或社会痛点的心理机制,通过潜意识刺激读者点击欲望
-           人性弱点触发:贪婪(200万保单)、猎奇(林彪密件)、窥私(家庭算计)
-           生存焦虑关联:医疗(脑瘫儿)、养老(子女不孝)、食品安全(二次加热)
-           身份代入设计:选择"老太太/外甥女/退休母亲"等易引发群体共鸣的角色
-        输入的标题是: '{ori_title}'
+请将以下标题改写成适合公众号中小程序点击和传播的文章标题,文章标题的写作规范如下,请学习后进行文章标题的编写。直接输出最终的文章标题,文章标题撰写规范如下:
+1. 标题结构:要点前置,信息明确
+    核心信息前置:标题开头直接点出文章的核心内容或亮点,吸引读者注意。例如:
+      “我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗?”
+      “亩产7000斤,被误认成萝卜却曾是‘救命粮’,如今成我国出口名蔬”。
+    简洁明了:标题通常在20字以内,信息集中且易于理解。
+    悬念前置结构:前半句设置反常/冲突场景(如"刑满释放蹬三轮")+后半句用结果反转制造悬念("政府领导登门分配工作")
+    多要素拼接:通过冒号/逗号分隔不同叙事主体(地域+人物冲突+权威评价),如"辽宁女子住高档小区被敲门,法院判决意外"
+
+2. 情绪表达:激发共鸣,引发好奇
+    情感共鸣:通过情感化的语言触动读者,泪崩/守护/抱头痛哭等情感冲击词,配合家庭伦理场景
+    例如:
+      “老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了”。
+      “儿子卖车卖房给母亲治病,母亲去世后儿媳收拾房间,打开床底柜,儿子突然痛哭”。
+    悬念与好奇心:通过提问或制造悬念,激发读者点击欲望。例如:
+      “你知道是哪五家吗?”
+      “打开床底柜,儿子突然痛哭”。
+    冲突性情绪词:拍桌大骂/气愤不已/眼红不已/算计等强对抗性词汇
+    结果反差刺激:用"风光善终/价值过亿/判决意外"等违反预期的结果
+
+3. 语言风格:口语化、接地气
+    口语化表达:使用通俗易懂的语言,贴近读者生活。
+    刻意使用"赶都赶不走/各吃各的/我就知道你在家"等市井化用语。
+    例如:
+      “狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石”。
+      “聪明的女人,不会帮婆家3种忙,而蠢女人才一再插手”。
+    接地气的词汇:使用“狗屎运”“蠢女人”等口语化词汇,增强亲切感。
+    身份反差构建:突出人物命运转折(老农→亿万富翁/囚犯→政府帮扶对象)
+    权威背书暗示:"专家气愤/法院判决/网友评价"等第三方视角增强可信度
+
+4. 标点运用:增强语气,突出重点
+    问号与感叹号:通过问号制造悬念,感叹号强化情感。
+    在关键转折点使用("太气人了!/赔不了!")
+    问号制造互动:如"容嬷嬷是校花?"激发读者验证心理
+    例如:
+      “你知道是哪五家吗?”
+      “太无耻了!湖南,一名厨师被公司派到云南‘出差’被拒……”
+    引号与冒号:用于突出关键词或转折点。
+    破折号递进:用"——"引导关键信息("吃不完最好扔掉——")
+    例如:
+      “被误认成萝卜却曾是‘救命粮’”。
+      “女子归还后,失主拒绝支付报酬,还说:要有格局”。
+
+5. 热点与话题性:结合社会热点或争议
+    社会热点:结合当前热点事件或争议话题,吸引关注。例如:
+      “上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了!”
+    争议性话题:通过争议性内容引发讨论。例如:
+      “李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔”。
+
+6. 数字与具体细节:增强可信度与吸引力
+    数字的运用:通过具体数字增强标题的可信度和吸引力。例如:
+      “亩产7000斤”。
+      “22年河南男子跳河救人,体力耗尽留遗言”。
+    细节描述:通过细节让标题更具画面感。例如:
+      “打开床底柜,儿子突然痛哭”。
+      “扒开后捡到鸡蛋大小的青鱼石”。
+
+7. 价值诉求:传递实用信息或情感价值
+    实用信息:提供对读者有价值的信息。例如:
+      “我国存款最安全的五大银行,永远都不会倒闭”。
+      “72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花”。
+    情感价值:通过情感故事或人生哲理打动读者。例如:
+      “父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待”。
+
+8. 名人效应与历史情怀:增强吸引力
+    名人效应:提及名人或历史事件,吸引关注。例如:
+      “难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美”。
+      “1975年‘下馆子’的老照片,2元能吃些什么,勾起那段最难忘的时光”。
+
+9.隐藏传播逻辑:通过标题中暗含的、能触发人性弱点(如猎奇、贪婪、同情)或社会痛点的心理机制,通过潜意识刺激读者点击欲望
+   人性弱点触发:贪婪(200万保单)、猎奇(林彪密件)、窥私(家庭算计)
+   生存焦虑关联:医疗(脑瘫儿)、养老(子女不孝)、食品安全(二次加热)
+   身份代入设计:选择"老太太/外甥女/退休母亲"等易引发群体共鸣的角色
+输入的标题是: '{ori_title}'
+        """
+        return prompt
+
+    @staticmethod
+    def category_generation_from_title(title_list: List[Tuple[str, str]]) -> str:
+        """
+        generate prompt category for given title
+        """
+        prompt = f"""
+请帮我完成以下任务:输入为文章的标题,根据标题判断其内容所属的类目,输出为文章标题及其对应的类目。
+类目需从以下15个品类内选择:
+1. 知识科普
+定义:以通俗易懂的方式普及科学、技术、健康、安全、生活常识、财产保护、医保政策、为人处事方式等内容,旨在提高公众的知识水平和认知能力。内容通常具有教育性和实用性,涵盖自然、社会、文化等多个领域。
+标题示例:
+我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗?
+借条上不要写“这3个字”,不然变成一张废纸,否则用法律也没用
+不能二次加热的3种食物!再次提醒:这3种食物吃不完最好扔掉
+
+2. 军事历史
+定义:聚焦于历史上的军事事件、战争故事、军事策略、英雄人物等内容,旨在还原战争场景、探讨军事决策、揭示历史真相,并展现战争中的人物命运与历史影响。内容通常以叙事、分析或回忆的形式呈现,兼具历史深度和故事性。
+标题示例:
+对越作战永远失踪的332人,陵园没有墓碑,没有名字,只有烈士证
+淮海大战丢失阵地,师长带头冲锋!最后出一口恶气:活捉敌最高指挥官
+抗战时,一村民被敌拉去带路,半道回头忽发现:后面跟个游击队员
+
+3. 家长里短
+定义:围绕家庭成员之间的关系、矛盾、情感、道德、等展开的故事或讨论,内容常涉及婚姻、亲子、婆媳、兄弟姐妹等关系,或是人情往来、金钱纠纷、情感变化等内容,反映家庭生活中的温情、冲突与人性。
+标题示例:
+父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待
+老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了
+我花150一天雇了阿姨,两天后上班回来给她300,阿姨说我账算错了
+
+4. 社会法治
+定义:聚焦社会事件、法律纠纷、法院判决、社会现象等内容,通常涉及道德、法律、公平正义等议题,旨在揭示社会问题、探讨法律规则或反映人性与社会现实。
+标题示例:
+山东,女子在小区捡到16万天价项链,业主悬赏3万找回,女子归还后,失主拒绝支付报酬,还说:要有格局,女子认为被骗,将失主告上法庭
+陕西,女子22万买26层房,2年后,楼盘24层就已经封顶!开发商:你闹事造成100万损失,道歉才给赔偿!
+上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了!
+
+5. 奇闻趣事
+定义:以猎奇、娱乐为主,涵盖罕见、奇特、有趣的事件、发现或故事,内容通常具有趣味性和话题性,能够引发读者的好奇心和讨论。
+标题示例:
+狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石,网友:起码值几千!
+内蒙古小伙河边捡到金牌,拒绝上交将其熔成金手镯,专家气愤不已
+男子买了一辆废弃坦克,拆油箱时,他发现了一根又一根的金条……
+
+6. 名人八卦
+定义:围绕名人的生活、言论、事件、八卦等内容展开,通常涉及娱乐圈、政界、历史人物等,旨在满足公众对名人隐私和动态的好奇心。
+标题示例:
+难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美
+心狠手辣的容嬷嬷年轻时是校花?看了照片后,网友直接闭嘴了!
+李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔
+
+7. 健康养生
+定义:关注健康、养生、疾病预防、生活习惯等方面的知识和建议,内容通常具有实用性和指导性,旨在帮助读者改善生活质量、提升健康水平。
+标题示例:
+72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花
+40岁女子每天吃水煮蛋,一年后去体检,检查报告令医生都羡慕不已
+2024年血糖新标准已公布,不再是3.9~6.1,你的血糖还不算高吗?
+
+8. 情感故事
+定义:以人与人之间的情感交流、感人故事、情感经历为主题,内容通常充满温情、感动或反思,旨在引发读者的情感共鸣和思考。
+标题示例:
+男孩饭店吃饭,发现陌生女子和去世母亲很像,走过去说:我妈妈去世了,能抱一下我吗?
+河南一女子直播时,被失散 32 年的父亲认出:闺女等着爸爸接你回家
+1987年,江苏男子借好友一千元,25年后朋友成富豪还他1000万报恩
+流浪狗跟着骑行夫妻跑了一百多公里,一直守护在女主身边,赶都赶不走,当男主得知原因后竟抱着狗狗大哭起来
+
+9. 国家大事
+定义:涉及国家实力、科技发展、资源发现、国际合作等内容,通常以宏观视角展现国家的综合实力、科技成就或国际影响力,体现国家的崛起与发展。
+标题示例:
+我国在南极发现“海上粮仓”,储量高达10亿吨,世界各国眼红不已
+我国贵州发现7000万吨宝藏,价值高达上万亿,多国求合作被拒绝
+距我国3000公里,塞班岛明明归美国管辖,为何岛上大多是中国人?
+
+10. 现代人物
+定义:聚焦活跃在21世纪后具有传奇色彩或巨大贡献的人物、事迹、成就等,内容通常充满戏剧性和启发性,旨在展现人物的非凡经历或历史贡献。
+标题示例:
+她曾狂贪国家上百亿,被发现时已经移居美国,最终还风光一时得善终
+山东女子因坐月子无聊,破译美国2套绝密系统的密码,国家:奖励711万!
+牺牲太大了!航天女英雄刘洋:结婚8年未生子,回地面后“消失”的她怎样了?
+
+11. 怀旧时光
+定义:以回忆和怀旧为主题,涉及过去的历史、文化、生活、照片等内容,旨在唤起读者对过去时光的情感共鸣和怀念。
+标题示例:
+1975年“下馆子”的老照片,2元能吃些什么,勾起那段最难忘的时光
+82年,北京老人捡回两张“破椅子”,遭家人数落,29年后拍出2300万
+这张老照片第一次看到,邓颖超和李讷的罕见合影!
+
+12. 政治新闻
+定义:聚焦政治事件、领导人动态、国际关系等内容,通常以新闻或分析的形式呈现,旨在揭示政治局势、政策变化或国际关系的动态。
+标题示例:
+中方外长行程有变,提前结束访欧匆匆回国,带回来一个好消息
+宋庆龄在北京逝世后,远在美国的宋美龄只说了7个字,字字揪心!
+庐山会议后,叶帅去劝彭德怀认个错,哭着说了一句心里话
+
+13. 历史人物
+定义:聚焦于21世纪前具有重要影响的人物,包括他们的生平、事迹、成就、性格、趣事及其对历史进程的贡献。内容通常以传记、回忆录或历史分析的形式呈现,旨在还原人物的真实面貌并探讨其历史意义。
+标题示例:
+林彪去世后,蒋介石收到林彪与戴笠的一份密谈文件,看后拍桌大骂
+张学良软禁时的一张实拍照片,头发秃顶,两眼无光,像个中年老头
+1912年,孙中山和两个女儿罕见留影,面对镜头父女三人看起来很幸福
+
+14. 社会现象
+定义:关注社会中出现的普遍现象、趋势或问题,通常涉及文化、经济、教育、民生等领域。内容以观察、分析或评论为主,旨在揭示现象背后的原因、影响及社会意义,引发公众的思考和讨论。
+标题示例:
+22年河南男子跳河救人,体力耗尽留遗言,被救女子猛然抓住他:一起走
+浙江一老人刑满释放,靠蹬三轮为生,6年后,政府领导登门拜访:我们帮您分配工作
+儿子收到清华通知书,父亲花5万请全村吃席,镇长看一眼竟说:这是假的
+
+15.财经科技
+定义:聚焦于经济、金融、投资及行业发展的分析与预测,涵盖未来经济趋势、资产价值变化、行业变革及个人理财策略等内容。可以提供前瞻性的财经视角和实用的理财建议,帮助其把握经济动态、优化财务规划并应对行业变化。
+标题示例:
+未来10年,现金和房子都将贬值,只有2样东西最值钱
+外卖时代将被终结?一个全新行业正悄悄取代外卖,你准备好了吗?
+准备存款的一定要知道,今明两年,定期存款要记住“4不存”
+
+输入是一个 LIST, LIST 中的每个元素是一个元组,元组的第一个元素是文章的 ID,第二个元素是文章的标题。
+最后输出结果请用JSON格式输出,key为ID,value为品类,仅输出JSON,不要markdown格式,不要任何其他内容
+如果标题中包含半角双引号,则进行转义
+输入的 LIST 是 {title_list}
+检查你的输出,输出的品类一定需要是输入的 15个品类,只需要输出品类的中文汉字
         """
         return prompt
 
+    async def _roll_back_lock_tasks(self, table_name: str) -> int:
+        query = f"""
+           update {table_name}
+           set category_status = %s
+           where category_status = %s and category_status_update_ts <= %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.INIT_STATUS,
+                self.PROCESSING_STATUS,
+                int(time.time()) - self.MAX_PROCESSING_TIME,
+            ),
+        )
+
+    async def process_single_article(
+        self, content_type: str, article: Dict
+    ) -> Optional[Dict]:
+        match content_type:
+            case "video":
+                article_id = article["id"]
+                title = article["article_title"]
+            case "article":
+                article_id = article["article_id"]
+                title = article["title"]
+            case _:
+                raise ValueError("content type is not supported")
+
+        title_batch = [(article_id, title)]
+        prompt = self.category_generation_from_title(title_batch)
+        try:
+            completion = fetch_deepseek_completion(
+                model="DeepSeek-V3", prompt=prompt, output_type="json"
+            )
+            return completion
+
+        except Exception as e:
+            await self.aliyun_log.log(
+                contents={
+                    "trace_id": self.trace_id,
+                    "data": {
+                        "article_id": article_id,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                }
+            )
+            return None
+
+
+class VideoPoolCategoryGeneration:
+    pass
+
+
+class ArticlePoolCategoryGeneration(TitleProcess):
+
+    def __init__(self, pool, aliyun_log, trace_id):
+        super().__init__(pool, aliyun_log, trace_id)
+
+    async def lock_task(self, article_id_tuple: tuple[int, ...]) -> int:
+        update_query = f"""
+            update long_articles.crawler_meta_article
+            set category_status = %s, category_status_update_ts = %s
+            where article_id in %s and category_status = %s;
+        """
+
+        return await self.pool.async_save(
+            query=update_query,
+            params=(
+                self.PROCESSING_STATUS,
+                int(time.time()),
+                article_id_tuple,
+                self.INIT_STATUS,
+            ),
+        )
+
+    async def get_task_list(self, limit):
+        query = f"""
+            select article_id, title from crawler_meta_article
+            where category_status = %s and status = %s and score > %s
+            order by score desc limit %s;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(
+                self.INIT_STATUS,
+                self.ARTICLE_INIT_STATUS,
+                self.LIMIT_SCORE,
+                limit,
+            ),
+        )
+
+    async def set_category_status_as_success(
+        self, article_id: int, category: str
+    ) -> int:
+        update_query = f"""
+               update long_articles.crawler_meta_article
+               set category_by_ai = %s, category_status = %s, category_status_update_ts = %s
+               where article_id = %s and category_status = %s;
+        """
+        return await self.pool.async_save(
+            query=update_query,
+            params=(
+                category,
+                self.SUCCESS_STATUS,
+                int(time.time()),
+                article_id,
+                self.PROCESSING_STATUS,
+            ),
+        )
+
+    async def set_category_status_as_fail(self, article_id: int) -> int:
+        update_query = f"""
+               update long_articles.crawler_meta_article
+               set category_status = %s, category_status_update_ts = %s
+               where article_id = %s and category_status = %s;
+        """
+        return await self.pool.async_save(
+            query=update_query,
+            params=(
+                self.FAIL_STATUS,
+                int(time.time()),
+                article_id,
+                self.PROCESSING_STATUS,
+            ),
+        )
+
+    async def process_each_batch(self, task_batch):
+        title_batch = [(i["article_id"], i["title"]) for i in task_batch]
+        id_tuple = tuple([int(i["article_id"]) for i in task_batch])
+
+        if await self.lock_task(id_tuple):
+            prompt = self.category_generation_from_title(title_batch)
+
+            try:
+                completion = fetch_deepseek_completion(
+                    model="DeepSeek-V3", prompt=prompt, output_type="json"
+                )
+                for article in title_batch:
+                    article_id = article[0]
+                    category = completion.get(str(article_id))
+                    if category:
+                        await self.set_category_status_as_success(article_id, category)
+                    else:
+                        await self.set_category_status_as_fail(article_id)
+
+            except Exception as e:
+                await self.aliyun_log.log(
+                    contents={
+                        "task": "ArticlePoolCategoryGeneration",
+                        "function": "process_each_batch",
+                        "message": "batch 中存在敏感词,AI 拒绝返回",
+                        "status": "fail",
+                        "trace_id": self.trace_id,
+                        "data": {
+                            "article_id": id_tuple,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )
+                for article in task_batch:
+                    single_completion = await self.process_single_article(
+                        content_type="article", article=article
+                    )
+                    article_id = article["article_id"]
+                    if single_completion:
+                        category = single_completion.get(str(article_id))
+                        if category:
+                            # set as success
+                            await self.set_category_status_as_success(
+                                article_id, category
+                            )
+                        else:
+                            await self.set_category_status_as_fail(article_id)
+                    else:
+                        # set as fail
+                        await self.set_category_status_as_fail(article_id)
+                return
+        else:
+            return
+
+    async def deal(self, limit):
+        # await self._roll_back_lock_tasks(table_name="crawler_meta_article")
+
+        if not limit:
+            limit = self.PROCESS_NUM
+
+        task_list = await self.get_task_list(limit=limit)
+        print(task_list)
+        await self.aliyun_log.log(
+            contents={
+                "task": "ArticlePoolCategoryGeneration",
+                "function": "deal",
+                "trace_id": self.trace_id,
+                "message": f"总共获取{len(task_list)}条文章",
+            }
+        )
+        task_batch_list = yield_batch(data=task_list, batch_size=self.BATCH_SIZE)
+        batch_index = 0
+        for task_batch in task_batch_list:
+            batch_index += 1
+            try:
+                await self.process_each_batch(task_batch)
+                print(f"batch :{batch_index} 处理成功")
+            except Exception as e:
+                await self.aliyun_log.log(
+                    contents={
+                        "task": "ArticlePoolCategoryGeneration",
+                        "function": "deal",
+                        "message": f"batch {batch_index} processed failed",
+                        "status": "fail",
+                        "trace_id": self.trace_id,
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )
+
 
 class TitleRewrite(TitleProcess):
 
@@ -255,15 +612,5 @@ class TitleRewrite(TitleProcess):
 
         task_list = await self.get_articles_batch()
 
-        bar = tqdm(task_list, desc="title rewrite task")
-        for article in bar:
+        for article in task_list:
             await self.rewrite_each_article(article)
-            bar.set_description("title rewrite task")
-
-
-class VideoPoolCategoryGeneration:
-    pass
-
-
-class ArticlePoolCategoryGeneration:
-    pass

+ 137 - 0
applications/tasks/task_handler.py

@@ -0,0 +1,137 @@
+from datetime import datetime
+
+from applications.tasks.cold_start_tasks import ArticlePoolColdStart
+from applications.tasks.crawler_tasks import CrawlerToutiao
+from applications.tasks.crawler_tasks import WeixinAccountManager
+from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
+from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
+from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
+from applications.tasks.monitor_tasks import check_kimi_balance
+from applications.tasks.monitor_tasks import GetOffVideos
+from applications.tasks.monitor_tasks import CheckVideoAuditStatus
+from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
+from applications.tasks.monitor_tasks import TaskProcessingMonitor
+from applications.tasks.task_mapper import TaskMapper
+
+
+class TaskHandler(TaskMapper):
+    def __init__(self, data, log_service, db_client, trace_id):
+        self.data = data
+        self.log_client = log_service
+        self.db_client = db_client
+        self.trace_id = trace_id
+
+    # ---------- 下面是若干复合任务的局部实现 ----------
+    async def _check_kimi_balance_handler(self) -> int:
+        response = await check_kimi_balance()
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": "check_kimi_balance",
+                "data": response,
+            }
+        )
+        return self.TASK_SUCCESS_STATUS
+
+    async def _get_off_videos_task_handler(self) -> int:
+        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _check_video_audit_status_handler(self) -> int:
+        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _task_processing_monitor_handler(self) -> int:
+        sub_task = TaskProcessingMonitor(self.db_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _inner_gzh_articles_monitor_handler(self) -> int:
+        sub_task = InnerGzhArticlesMonitor(self.db_client)
+        return await sub_task.deal()
+
+    async def _title_rewrite_handler(self):
+        sub_task = TitleRewrite(self.db_client, self.log_client)
+        return await sub_task.deal()
+
+    async def _update_root_source_id_handler(self) -> int:
+        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _outside_monitor_handler(self) -> int:
+        collector = OutsideGzhArticlesCollector(self.db_client)
+        await collector.deal()
+        monitor = OutsideGzhArticlesMonitor(self.db_client)
+        return await monitor.deal()  # 应返回 SUCCESS / FAILED
+
+    async def _recycle_article_data_handler(self) -> int:
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+        recycle = RecycleDailyPublishArticlesTask(
+            self.db_client, self.log_client, date_str
+        )
+        await recycle.deal()
+        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
+        await check.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_toutiao_handler(self) -> int:
+        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+        method = self.data.get("method", "account")
+        media_type = self.data.get("media_type", "article")
+        category_list = self.data.get("category_list", [])
+
+        match method:
+            case "account":
+                await sub_task.crawler_task(media_type=media_type)
+            case "recommend":
+                await sub_task.crawl_toutiao_recommend_task(category_list)
+            case "search":
+                await sub_task.search_candidate_accounts()
+            case _:
+                raise ValueError(f"Unsupported method {method}")
+        return self.TASK_SUCCESS_STATUS
+
+    async def _article_pool_cold_start_handler(self) -> int:
+        cold_start = ArticlePoolColdStart(
+            self.db_client, self.log_client, self.trace_id
+        )
+        platform = self.data.get("platform", "weixin")
+        crawler_methods = self.data.get("crawler_methods", [])
+        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
+        return self.TASK_SUCCESS_STATUS
+
+    async def _candidate_account_quality_score_handler(self) -> int:
+        task = CandidateAccountQualityScoreRecognizer(
+            self.db_client, self.log_client, self.trace_id
+        )
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _article_pool_category_generation_handler(self) -> int:
+        task = ArticlePoolCategoryGeneration(
+            self.db_client, self.log_client, self.trace_id
+        )
+        limit_num = self.data.get("limit")
+        await task.deal(limit=limit_num)
+        return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_account_manager_handler(self) -> int:
+        platform = self.data.get("platform", "weixin")
+        account_id_list = self.data.get("account_id_list")
+
+        match platform:
+            case "weixin":
+                task = WeixinAccountManager(
+                    self.db_client, self.log_client, self.trace_id
+                )
+            case _:
+                raise ValueError(f"Unsupported platform {platform}")
+
+        await task.deal(platform=platform, account_id_list=account_id_list)
+        return self.TASK_SUCCESS_STATUS

+ 10 - 107
applications/tasks/task_scheduler.py

@@ -6,35 +6,21 @@ from datetime import datetime
 from typing import Awaitable, Callable, Dict
 
 from applications.api import feishu_robot
-from applications.utils import task_schedule_response, generate_task_trace_id
+from applications.utils import task_schedule_response
+from applications.tasks.task_handler import TaskHandler
 
-from applications.tasks.cold_start_tasks import ArticlePoolColdStart
-from applications.tasks.crawler_tasks import CrawlerToutiao
-from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
-from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
-from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
-from applications.tasks.llm_tasks import TitleRewrite
-from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
-from applications.tasks.monitor_tasks import check_kimi_balance
-from applications.tasks.monitor_tasks import GetOffVideos
-from applications.tasks.monitor_tasks import CheckVideoAuditStatus
-from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
-from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
-from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
-from applications.tasks.monitor_tasks import TaskProcessingMonitor
-from applications.tasks.task_mapper import TaskMapper
 
-
-class TaskScheduler(TaskMapper):
+class TaskScheduler(TaskHandler):
     """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
 
     # ---------- 初始化 ----------
-    def __init__(self, data, log_service, db_client):
+    def __init__(self, data, log_service, db_client, trace_id):
+        super().__init__(data, log_service, db_client, trace_id)
         self.data = data
         self.log_client = log_service
         self.db_client = db_client
         self.table = "long_articles_task_manager"
-        self.trace_id = generate_task_trace_id()
+        self.trace_id = trace_id
 
     # ---------- 公共数据库工具 ----------
     async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
@@ -193,6 +179,10 @@ class TaskScheduler(TaskMapper):
             "task_processing_monitor": self._task_processing_monitor_handler,
             # 候选账号质量分析
             "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
+            # 文章内容池--标题品类处理
+            "article_pool_category_generation": self._article_pool_category_generation_handler,
+            # 抓取账号管理
+            "crawler_account_manager": self._crawler_account_manager_handler
         }
 
         if task_name not in handlers:
@@ -200,90 +190,3 @@ class TaskScheduler(TaskMapper):
                 "4001", "wrong task name input"
             )
         return await self._run_with_guard(task_name, date_str, handlers[task_name])
-
-    # ---------- 下面是若干复合任务的局部实现 ----------
-    async def _check_kimi_balance_handler(self) -> int:
-        response = await check_kimi_balance()
-        await self.log_client.log(
-            contents={
-                "trace_id": self.trace_id,
-                "task": "check_kimi_balance",
-                "data": response,
-            }
-        )
-        return self.TASK_SUCCESS_STATUS
-
-    async def _get_off_videos_task_handler(self) -> int:
-        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
-        return await sub_task.deal()
-
-    async def _check_video_audit_status_handler(self) -> int:
-        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
-        return await sub_task.deal()
-
-    async def _task_processing_monitor_handler(self) -> int:
-        sub_task = TaskProcessingMonitor(self.db_client)
-        await sub_task.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _inner_gzh_articles_monitor_handler(self) -> int:
-        sub_task = InnerGzhArticlesMonitor(self.db_client)
-        return await sub_task.deal()
-
-    async def _title_rewrite_handler(self):
-        sub_task = TitleRewrite(self.db_client, self.log_client)
-        return await sub_task.deal()
-
-    async def _update_root_source_id_handler(self) -> int:
-        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
-        await sub_task.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _outside_monitor_handler(self) -> int:
-        collector = OutsideGzhArticlesCollector(self.db_client)
-        await collector.deal()
-        monitor = OutsideGzhArticlesMonitor(self.db_client)
-        return await monitor.deal()  # 应返回 SUCCESS / FAILED
-
-    async def _recycle_article_data_handler(self) -> int:
-        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
-        recycle = RecycleDailyPublishArticlesTask(
-            self.db_client, self.log_client, date_str
-        )
-        await recycle.deal()
-        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
-        await check.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _crawler_toutiao_handler(self) -> int:
-        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
-        method = self.data.get("method", "account")
-        media_type = self.data.get("media_type", "article")
-        category_list = self.data.get("category_list", [])
-
-        match method:
-            case "account":
-                await sub_task.crawler_task(media_type=media_type)
-            case "recommend":
-                await sub_task.crawl_toutiao_recommend_task(category_list)
-            case "search":
-                await sub_task.search_candidate_accounts()
-            case _:
-                raise ValueError(f"Unsupported method {method}")
-        return self.TASK_SUCCESS_STATUS
-
-    async def _article_pool_cold_start_handler(self) -> int:
-        cold_start = ArticlePoolColdStart(
-            self.db_client, self.log_client, self.trace_id
-        )
-        platform = self.data.get("platform", "weixin")
-        crawler_methods = self.data.get("crawler_methods", [])
-        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
-        return self.TASK_SUCCESS_STATUS
-
-    async def _candidate_account_quality_score_handler(self) -> int:
-        task = CandidateAccountQualityScoreRecognizer(
-            self.db_client, self.log_client, self.trace_id
-        )
-        await task.deal()
-        return self.TASK_SUCCESS_STATUS

+ 1 - 1
applications/utils/common.py

@@ -217,4 +217,4 @@ def ci_lower(data: List[int], conf: float = 0.95) -> float:
     std = statistics.stdev(data) / math.sqrt(n)
     # t 分位点(左侧):ppf 返回负值
     t_left = t.ppf((1 - conf) / 2, df=n - 1)
-    return mean + t_left * std
+    return mean + t_left * std

+ 4 - 30
dev.py

@@ -1,36 +1,10 @@
 import asyncio
 
-from applications.api import AsyncElasticSearchClient
+from applications.api import fetch_deepseek_completion
 
 
-async def get_crawler_task():
-    async with AsyncElasticSearchClient() as client:
-        # await client.es.indices.put_mapping(
-        #     index="meta_articles_v1",
-        #     body={
-        #         "properties": {
-        #             "status": {
-        #                 "type": "integer",
-        #             }
-        #         }
-        #     }
-        # )
-        await client.es.update_by_query(
-            index="meta_articles_v1",
-            body={
-                "script": {
-                    "source": "ctx._source.status = params.default",
-                    "lang": "painless",
-                    "params": {"default": 1},
-                },
-                "query": {  # 只改那些还没有 status 的
-                    "bool": {"must_not": [{"exists": {"field": "status"}}]}
-                },
-                "conflicts": "proceed",
-            },
-        )
-        print("success")
+prompt = "你好"
 
+res = fetch_deepseek_completion(model="defa", prompt=prompt)
 
-if __name__ == "__main__":
-    asyncio.run(get_crawler_task())
+print(res)

+ 3 - 2
requirements.txt

@@ -1,3 +1,4 @@
+aiomonitor~=0.7.1
 quart~=0.19.6
 hypercorn
 aiomysql~=0.2.0
@@ -15,8 +16,8 @@ tqdm~=4.66.6
 pyapollos~=0.1.5
 pyotp~=2.9.0
 elasticsearch~=8.17.2
-openai~=1.47.1
+openai~=1.98.0
 tenacity~=9.0.0
 fake-useragent~=2.1.0
 pydantic~=2.10.6
-aiomonitor~=0.7.1
+scipy~=1.15.2

+ 5 - 5
routes/blueprint.py

@@ -1,5 +1,6 @@
 from quart import Blueprint, jsonify, request
 from applications.ab_test import GetCoverService
+from applications.utils import generate_task_trace_id
 
 from applications.tasks import TaskScheduler
 
@@ -16,15 +17,14 @@ def server_routes(pools, log_service):
 
     @server_blueprint.route("/run_task", methods=["POST"])
     async def run_task():
+        trace_id = generate_task_trace_id()
         data = await request.get_json()
-        print("ss", data)
-        task_scheduler = TaskScheduler(data, log_service, pools)
+        task_scheduler = TaskScheduler(data, log_service, pools, trace_id)
         response = await task_scheduler.deal()
-        print(response)
         return jsonify(response)
 
-    @server_blueprint.route("/finish_task", methods=["GET"])
-    async def finish_task():
+    @server_blueprint.route("/health", methods=["GET"])
+    async def hello_world():
         # data = await request.get_json()
         return jsonify({"message": "hello world"})