Sfoglia il codice sorgente

新增-账号品类生成任务

luojunhui 1 mese fa
parent
commit
0b5a0649da

+ 1 - 1
applications/tasks/__init__.py

@@ -1 +1 @@
-from .task_scheduler import TaskScheduler
+from .task_scheduler import TaskScheduler

+ 15 - 5
applications/tasks/llm_tasks/candidate_account_process.py

@@ -96,7 +96,9 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
             update_query, (new_status, account_id, ori_status)
         )
 
-    async def insert_account_into_crawler_queue(self, score_list: List[int], account: dict) -> None:
+    async def insert_account_into_crawler_queue(
+        self, score_list: List[int], account: dict
+    ) -> None:
         """
         计算账号的得分置信区间下限,若置信区间下限的分数大于阈值,则认为是好的账号
         """
@@ -107,7 +109,13 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
             """
             await self.pool.async_save(
                 query=query,
-                params=(account["platform"], account["account_id"], account["account_name"], 'ai_recognize', self.ACCOUNT_GOOD_STATUS)
+                params=(
+                    account["platform"],
+                    account["account_id"],
+                    account["account_name"],
+                    "ai_recognize",
+                    self.ACCOUNT_GOOD_STATUS,
+                ),
             )
 
     async def score_for_each_account_by_llm(self, account):
@@ -153,13 +161,15 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                 params=(
                     json.dumps(completion),
                     avg_score,
-                    self.PROCESSING_STATUS,
-                    account_id,
                     self.SUCCESS_STATUS,
+                    account_id,
+                    self.PROCESSING_STATUS,
                 ),
             )
             # 判断置信区间下限, 并且插入账号
-            await self.insert_account_into_crawler_queue(score_list=completion, account=account)
+            await self.insert_account_into_crawler_queue(
+                score_list=completion, account=account
+            )
 
         except Exception as e:
             await self.log_client.log(

+ 174 - 166
applications/tasks/llm_tasks/process_title.py

@@ -1,7 +1,7 @@
 import time
 import traceback
 
-from tqdm import tqdm
+from typing import Optional, List, Dict, Tuple
 
 from applications.api import fetch_deepseek_completion
 from applications.utils import yield_batch
@@ -48,7 +48,7 @@ class TitleProcess(Const):
         self.trace_id = trace_id
 
     @staticmethod
-    def generate_title_rewrite_prompt(ori_title):
+    def generate_title_rewrite_prompt(ori_title: str) -> str:
         """
         生成prompt
         """
@@ -131,7 +131,7 @@ class TitleProcess(Const):
         return prompt
 
     @staticmethod
-    def category_generation_from_title(title_list):
+    def category_generation_from_title(title_list: List[Tuple[str, str]]) -> str:
         """
         generate prompt category for given title
         """
@@ -252,22 +252,24 @@ class TitleProcess(Const):
         """
         return prompt
 
-    async def _roll_back_lock_tasks(self, table_name):
+    async def _roll_back_lock_tasks(self, table_name: str) -> int:
         query = f"""
            update {table_name}
            set category_status = %s
            where category_status = %s and category_status_update_ts <= %s;
         """
-        await self.pool.async_save(
+        return await self.pool.async_save(
             query=query,
             params=(
                 self.INIT_STATUS,
                 self.PROCESSING_STATUS,
                 int(time.time()) - self.MAX_PROCESSING_TIME,
-            )
+            ),
         )
 
-    async def process_single_article(self, content_type, article):
+    async def process_single_article(
+        self, content_type: str, article: Dict
+    ) -> Optional[Dict]:
         match content_type:
             case "video":
                 article_id = article["id"]
@@ -291,158 +293,13 @@ class TitleProcess(Const):
                 contents={
                     "trace_id": self.trace_id,
                     "data": {
-                    "article_id": article_id,
-                    "error": str(e),
-                    "traceback": traceback.format_exc(),
-                    }
-                }
-            )
-            return None
-
-
-class TitleRewrite(TitleProcess):
-
-    async def roll_back_blocked_tasks(self):
-        """
-        rollback blocked tasks
-        """
-        query = f"""
-            select id, title_rewrite_status_update_timestamp
-            from publish_single_video_source
-            where title_rewrite_status = {self.TITLE_REWRITE_LOCK_STATUS};
-        """
-        article_list = await self.pool.async_fetch(
-            query=query,
-            db_name="long_articles",
-        )
-        if article_list:
-            blocked_id_list = [
-                i["id"]
-                for i in article_list
-                if (int(time.time()) - i["title_rewrite_status_update_timestamp"])
-                > self.TITLE_REWRITE_LOCK_TIME
-            ]
-            if blocked_id_list:
-                update_query = f"""
-                    update publish_single_video_source
-                    set title_rewrite_status = %s
-                    where id in %s and title_rewrite_status = %s;
-                """
-                await self.pool.async_save(
-                    query=update_query,
-                    params=(
-                        self.TITLE_REWRITE_INIT_STATUS,
-                        tuple(blocked_id_list),
-                        self.TITLE_REWRITE_LOCK_STATUS,
-                    ),
-                )
-
-    async def get_articles_batch(self, batch_size=1000):
-        query = f"""
-            select content_trace_id, article_title
-            from publish_single_video_source 
-            where bad_status = {self.ARTICLE_POSITIVE_STATUS} 
-                and audit_status = {self.ARTICLE_AUDIT_PASSED_STATUS} 
-                and title_rewrite_status = {self.TITLE_REWRITE_INIT_STATUS}
-                and platform in ('hksp', 'sph')
-            limit {batch_size};
-        """
-        return await self.pool.async_fetch(query=query, db_name="long_articles")
-
-    async def update_title_rewrite_status(
-        self, content_trace_id, ori_status, new_status
-    ):
-        query = f"""
-            update publish_single_video_source
-            set title_rewrite_status = %s, title_rewrite_status_update_timestamp = %s
-            where content_trace_id = %s and title_rewrite_status= %s;
-        """
-        affected_rows = await self.pool.async_save(
-            query=query,
-            params=(new_status, int(time.time()), content_trace_id, ori_status),
-        )
-        return affected_rows
-
-    async def insert_into_rewrite_table(self, content_trace_id, new_title):
-        """
-        insert into rewrite_table
-        """
-        insert_sql = f"""
-            insert into video_title_rewrite
-            (content_trace_id, new_title, status, prompt_version)
-            values (%s, %s, %s, %s);
-        """
-        await self.pool.async_save(
-            query=insert_sql,
-            params=(
-                content_trace_id,
-                new_title,
-                self.TITLE_USEFUL_STATUS,
-                self.PROMPT_VERSION,
-            ),
-        )
-
-    async def rewrite_each_article(self, article):
-        """
-        rewrite each article
-        """
-        content_trace_id = article["content_trace_id"]
-        article_title = article["article_title"]
-
-        # lock each task
-        affected_rows = await self.update_title_rewrite_status(
-            content_trace_id=content_trace_id,
-            ori_status=self.TITLE_REWRITE_INIT_STATUS,
-            new_status=self.TITLE_REWRITE_LOCK_STATUS,
-        )
-        if not affected_rows:
-            return
-
-        try:
-            prompt = self.generate_title_rewrite_prompt(article_title)
-            new_title = fetch_deepseek_completion(model="default", prompt=prompt)
-
-            # insert into rewrite table
-            await self.insert_into_rewrite_table(
-                content_trace_id=content_trace_id, new_title=new_title
-            )
-
-            # unlock
-            await self.update_title_rewrite_status(
-                content_trace_id=content_trace_id,
-                ori_status=self.TITLE_REWRITE_LOCK_STATUS,
-                new_status=self.TITLE_REWRITE_SUCCESS_STATUS,
-            )
-        except Exception as e:
-            await self.aliyun_log.log(
-                contents={
-                    "task": "title rewrite task",
-                    "function": "rewrite_each_article",
-                    "message": content_trace_id,
-                    "status": "fail",
-                    "data": {
-                        "error_message": str(e),
-                        "error_type": type(e).__name__,
+                        "article_id": article_id,
+                        "error": str(e),
                         "traceback": traceback.format_exc(),
                     },
                 }
             )
-            await self.update_title_rewrite_status(
-                content_trace_id=content_trace_id,
-                ori_status=self.TITLE_REWRITE_LOCK_STATUS,
-                new_status=self.TITLE_REWRITE_FAIL_STATUS,
-            )
-
-    async def deal(self):
-        """title rewrite task deal"""
-        await self.roll_back_blocked_tasks()
-
-        task_list = await self.get_articles_batch()
-
-        bar = tqdm(task_list, desc="title rewrite task")
-        for article in bar:
-            await self.rewrite_each_article(article)
-            bar.set_description("title rewrite task")
+            return None
 
 
 class VideoPoolCategoryGeneration:
@@ -473,16 +330,23 @@ class ArticlePoolCategoryGeneration(TitleProcess):
 
     async def get_task_list(self, limit):
         query = f"""
-            select article_id, title from long_articles.crawler_meta_article
+            select article_id, title from crawler_meta_article
             where category_status = %s and status = %s and score > %s
             order by score desc limit %s;
         """
         return await self.pool.async_fetch(
             query=query,
-            params=(self.INIT_STATUS, self.ARTICLE_INIT_STATUS, self.LIMIT_SCORE, limit),
+            params=(
+                self.INIT_STATUS,
+                self.ARTICLE_INIT_STATUS,
+                self.LIMIT_SCORE,
+                limit,
+            ),
         )
 
-    async def set_category_status_as_success(self,article_id: int, category: str) -> int:
+    async def set_category_status_as_success(
+        self, article_id: int, category: str
+    ) -> int:
         update_query = f"""
                update long_articles.crawler_meta_article
                set category_by_ai = %s, category_status = %s, category_status_update_ts = %s
@@ -546,17 +410,21 @@ class ArticlePoolCategoryGeneration(TitleProcess):
                             "article_id": id_tuple,
                             "error": str(e),
                             "traceback": traceback.format_exc(),
-                        }
+                        },
                     }
                 )
-                for article in tqdm(title_batch):
-                    single_completion = await self.process_single_article(content_type="article", article=article)
-                    article_id = article[0]
+                for article in task_batch:
+                    single_completion = await self.process_single_article(
+                        content_type="article", article=article
+                    )
+                    article_id = article["article_id"]
                     if single_completion:
                         category = single_completion.get(str(article_id))
                         if category:
                             # set as success
-                            await self.set_category_status_as_success(article_id, category)
+                            await self.set_category_status_as_success(
+                                article_id, category
+                            )
                         else:
                             await self.set_category_status_as_fail(article_id)
                     else:
@@ -567,19 +435,28 @@ class ArticlePoolCategoryGeneration(TitleProcess):
             return
 
     async def deal(self, limit):
-        await self._roll_back_lock_tasks(table_name="crawler_meta_article")
+        # await self._roll_back_lock_tasks(table_name="crawler_meta_article")
 
         if not limit:
             limit = self.PROCESS_NUM
 
         task_list = await self.get_task_list(limit=limit)
+        print(task_list)
+        await self.aliyun_log.log(
+            contents={
+                "task": "ArticlePoolCategoryGeneration",
+                "function": "deal",
+                "trace_id": self.trace_id,
+                "message": f"总共获取{len(task_list)}条文章",
+            }
+        )
         task_batch_list = yield_batch(data=task_list, batch_size=self.BATCH_SIZE)
         batch_index = 0
         for task_batch in task_batch_list:
             batch_index += 1
             try:
                 await self.process_each_batch(task_batch)
-
+                print(f"batch :{batch_index} 处理成功")
             except Exception as e:
                 await self.aliyun_log.log(
                     contents={
@@ -591,18 +468,149 @@ class ArticlePoolCategoryGeneration(TitleProcess):
                         "data": {
                             "error": str(e),
                             "traceback": traceback.format_exc(),
-                        }
+                        },
                     }
                 )
 
 
+class TitleRewrite(TitleProcess):
 
+    async def roll_back_blocked_tasks(self):
+        """
+        rollback blocked tasks
+        """
+        query = f"""
+            select id, title_rewrite_status_update_timestamp
+            from publish_single_video_source
+            where title_rewrite_status = {self.TITLE_REWRITE_LOCK_STATUS};
+        """
+        article_list = await self.pool.async_fetch(
+            query=query,
+            db_name="long_articles",
+        )
+        if article_list:
+            blocked_id_list = [
+                i["id"]
+                for i in article_list
+                if (int(time.time()) - i["title_rewrite_status_update_timestamp"])
+                > self.TITLE_REWRITE_LOCK_TIME
+            ]
+            if blocked_id_list:
+                update_query = f"""
+                    update publish_single_video_source
+                    set title_rewrite_status = %s
+                    where id in %s and title_rewrite_status = %s;
+                """
+                await self.pool.async_save(
+                    query=update_query,
+                    params=(
+                        self.TITLE_REWRITE_INIT_STATUS,
+                        tuple(blocked_id_list),
+                        self.TITLE_REWRITE_LOCK_STATUS,
+                    ),
+                )
 
+    async def get_articles_batch(self, batch_size=1000):
+        query = f"""
+            select content_trace_id, article_title
+            from publish_single_video_source 
+            where bad_status = {self.ARTICLE_POSITIVE_STATUS} 
+                and audit_status = {self.ARTICLE_AUDIT_PASSED_STATUS} 
+                and title_rewrite_status = {self.TITLE_REWRITE_INIT_STATUS}
+                and platform in ('hksp', 'sph')
+            limit {batch_size};
+        """
+        return await self.pool.async_fetch(query=query, db_name="long_articles")
 
+    async def update_title_rewrite_status(
+        self, content_trace_id, ori_status, new_status
+    ):
+        query = f"""
+            update publish_single_video_source
+            set title_rewrite_status = %s, title_rewrite_status_update_timestamp = %s
+            where content_trace_id = %s and title_rewrite_status= %s;
+        """
+        affected_rows = await self.pool.async_save(
+            query=query,
+            params=(new_status, int(time.time()), content_trace_id, ori_status),
+        )
+        return affected_rows
 
+    async def insert_into_rewrite_table(self, content_trace_id, new_title):
+        """
+        insert into rewrite_table
+        """
+        insert_sql = f"""
+            insert into video_title_rewrite
+            (content_trace_id, new_title, status, prompt_version)
+            values (%s, %s, %s, %s);
+        """
+        await self.pool.async_save(
+            query=insert_sql,
+            params=(
+                content_trace_id,
+                new_title,
+                self.TITLE_USEFUL_STATUS,
+                self.PROMPT_VERSION,
+            ),
+        )
 
+    async def rewrite_each_article(self, article):
+        """
+        rewrite each article
+        """
+        content_trace_id = article["content_trace_id"]
+        article_title = article["article_title"]
+
+        # lock each task
+        affected_rows = await self.update_title_rewrite_status(
+            content_trace_id=content_trace_id,
+            ori_status=self.TITLE_REWRITE_INIT_STATUS,
+            new_status=self.TITLE_REWRITE_LOCK_STATUS,
+        )
+        if not affected_rows:
+            return
+
+        try:
+            prompt = self.generate_title_rewrite_prompt(article_title)
+            new_title = fetch_deepseek_completion(model="default", prompt=prompt)
 
+            # insert into rewrite table
+            await self.insert_into_rewrite_table(
+                content_trace_id=content_trace_id, new_title=new_title
+            )
 
+            # unlock
+            await self.update_title_rewrite_status(
+                content_trace_id=content_trace_id,
+                ori_status=self.TITLE_REWRITE_LOCK_STATUS,
+                new_status=self.TITLE_REWRITE_SUCCESS_STATUS,
+            )
+        except Exception as e:
+            await self.aliyun_log.log(
+                contents={
+                    "task": "title rewrite task",
+                    "function": "rewrite_each_article",
+                    "message": content_trace_id,
+                    "status": "fail",
+                    "data": {
+                        "error_message": str(e),
+                        "error_type": type(e).__name__,
+                        "traceback": traceback.format_exc(),
+                    },
+                }
+            )
+            await self.update_title_rewrite_status(
+                content_trace_id=content_trace_id,
+                ori_status=self.TITLE_REWRITE_LOCK_STATUS,
+                new_status=self.TITLE_REWRITE_FAIL_STATUS,
+            )
 
+    async def deal(self):
+        """title rewrite task deal"""
+        await self.roll_back_blocked_tasks()
 
+        task_list = await self.get_articles_batch()
 
+        for article in task_list:
+            await self.rewrite_each_article(article)

+ 1 - 1
applications/utils/common.py

@@ -217,4 +217,4 @@ def ci_lower(data: List[int], conf: float = 0.95) -> float:
     std = statistics.stdev(data) / math.sqrt(n)
     # t 分位点(左侧):ppf 返回负值
     t_left = t.ppf((1 - conf) / 2, df=n - 1)
-    return mean + t_left * std
+    return mean + t_left * std

+ 4 - 30
dev.py

@@ -1,36 +1,10 @@
 import asyncio
 
-from applications.api import AsyncElasticSearchClient
+from applications.api import fetch_deepseek_completion
 
 
-async def get_crawler_task():
-    async with AsyncElasticSearchClient() as client:
-        # await client.es.indices.put_mapping(
-        #     index="meta_articles_v1",
-        #     body={
-        #         "properties": {
-        #             "status": {
-        #                 "type": "integer",
-        #             }
-        #         }
-        #     }
-        # )
-        await client.es.update_by_query(
-            index="meta_articles_v1",
-            body={
-                "script": {
-                    "source": "ctx._source.status = params.default",
-                    "lang": "painless",
-                    "params": {"default": 1},
-                },
-                "query": {  # 只改那些还没有 status 的
-                    "bool": {"must_not": [{"exists": {"field": "status"}}]}
-                },
-                "conflicts": "proceed",
-            },
-        )
-        print("success")
+prompt = "你好"
 
+res = fetch_deepseek_completion(model="defa", prompt=prompt)
 
-if __name__ == "__main__":
-    asyncio.run(get_crawler_task())
+print(res)

+ 1 - 1
requirements.txt

@@ -15,7 +15,7 @@ tqdm~=4.66.6
 pyapollos~=0.1.5
 pyotp~=2.9.0
 elasticsearch~=8.17.2
-openai~=1.47.1
+openai~=1.98.0
 tenacity~=9.0.0
 fake-useragent~=2.1.0
 pydantic~=2.10.6