luojunhui преди 2 седмици
родител
ревизия
e95bf5ea0f
променени са 3 файла, в които са добавени 70 реда и са изтрити 18 реда
  1. 62 18
      applications/tasks/crawler_tasks/crawler_hot_point.py
  2. 6 0
      applications/tasks/task_handler.py
  3. 2 0
      applications/tasks/task_scheduler.py

+ 62 - 18
applications/tasks/crawler_tasks/crawler_hot_point.py

@@ -2,23 +2,34 @@ from __future__ import annotations
 
 import asyncio
 import json
+import traceback
 from typing import Dict, List, Tuple
 from tqdm.asyncio import tqdm
 
+from applications.api import fetch_deepseek_completion
 from applications.crawler.tophub import get_hot_point_content
 
 
 class CrawlerHotPointConst:
-    max_page_index = 40
+    MAX_PAGE_INDEX = 40
 
-    init_status = 0
-    processing_status = 1
-    useful_status = 2
-    unuseful_status = 3
-    failed_status = 99
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    USEFUL_STATUS = 2
+    UNUSEFUL_STATUS = 3
+    FAILED_STATUS = 99
 
-    not_expired_status = 1
-    expired_status = 2
+    NOT_EXPIRED_STATUS = 1
+    EXPIRED_STATUS = 2
+
+    # batch
+    PROCESS_TITLE_BATCH_SIZE = 300
+
+    # ignore platforms
+    IGNORE_PLATFORMS = {
+        "中国日报", "每日珠宝杂志", "iBag包包", "ZAKER", "NASA 🌍", "wikiHow 中文",
+        "China Daily", "微信 ‧ 游戏", "Yahoo News"
+    }
 
 
 class CrawlerHotPointMapper(CrawlerHotPointConst):
@@ -27,10 +38,10 @@ class CrawlerHotPointMapper(CrawlerHotPointConst):
         self.log_client = log_client
         self.trace_id = trace_id
 
-    async def save_articles(self, articles: List[Tuple]):
+    async def save_articles(self, articles: List[Tuple]) -> int:
         """插入标题 && Link"""
         query = """
-            INSERT INTO hot_point_titles
+            INSERT IGNORE INTO hot_point_titles
                 (title, platform, link)
             VALUES (%s, %s, %s);
         """
@@ -38,7 +49,7 @@ class CrawlerHotPointMapper(CrawlerHotPointConst):
 
     async def update_useful_status(
         self, article_id: int, origin_status: int, new_status: int
-    ):
+    ) -> int:
         """
         更新文章状态
         """
@@ -51,7 +62,7 @@ class CrawlerHotPointMapper(CrawlerHotPointConst):
             query=query, params=(new_status, article_id, origin_status)
         )
 
-    async def set_as_expired(self, article_id: int):
+    async def set_as_expired(self, article_id: int) -> int:
         """
         设置文章为过期
         """
@@ -61,22 +72,49 @@ class CrawlerHotPointMapper(CrawlerHotPointConst):
             WHERE id = %s;
         """
         return await self.pool.async_save(
-            query=query, params=(self.expired_status, article_id)
+            query=query, params=(self.EXPIRED_STATUS, article_id)
         )
 
+    async def fetch_init_articles(self) -> List[Dict]:
+        """
+        获取未经过 LLM 判处处理的事件
+        """
+        query = """
+            SELECT id, title FROM hot_point_titles WHERE status = %s AND useful = %s
+            ORDER BY id Limit %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(self.NOT_EXPIRED_STATUS, self.INIT_STATUS, self.PROCESS_TITLE_BATCH_SIZE))
+
+
 
 class CrawlerHotPointTask(CrawlerHotPointMapper):
+    CLASSIFY_PROMPT = """
+你是一个内容分析助手,专门从热榜标题中识别出55岁以上老年人可能喜欢或关注的银发内容。
+银发内容通常涉及健康、养老、退休生活、老年疾病、社会保障、代际关系、奇闻趣事、名人故事、社会事件等主题。
+
+1. **任务描述**:
+    扫描所有标题,筛选出与银发内容高度相关时效性新闻信息。相关性判断基于标题是否直接或间接提及老年人相关话题,或可能吸引55岁以上人群的兴趣。返回适合的 id。
+    如果遇到敏感人物,正常过滤
+    请注意,一定要是新闻性事件
+4. **输出格式**:输出结果为 JSON,只需要返回适合老年人话题的 id, 结构为
+    {
+        "IDS": [1, 2, 3, ...]
+    }
+现在, 请处理我输入的标题 && id
+"""
+
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
-    @staticmethod
-    def process_raw_data(response_data):
+    def process_raw_data(self, response_data):
         """
         处理原始数据
         """
         articles = []
-        for item in response_data:
+        for item in response_data['data']['data']:
             platform = item["source"]
+            if platform in self.IGNORE_PLATFORMS:
+                continue
             for article in item["rankList"]:
                 title = article["title"]
                 link = article["link"]
@@ -87,7 +125,7 @@ class CrawlerHotPointTask(CrawlerHotPointMapper):
         """
         爬取热点标题
         """
-        for page in tqdm(range(1, self.max_page_index)):
+        for page in tqdm(range(1, self.MAX_PAGE_INDEX)):
             try:
                 raw_data = await get_hot_point_content(page_index=page)
                 articles = self.process_raw_data(raw_data)
@@ -99,4 +137,10 @@ class CrawlerHotPointTask(CrawlerHotPointMapper):
         """
         用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好
         """
-        pass
+        infos = await self.fetch_init_articles()
+        prompt = f"{self.CLASSIFY_PROMPT}\n{infos}"
+        print(prompt)
+        response = fetch_deepseek_completion(
+            prompt=prompt, model="default", output_type="json"
+        )
+        print(response)

+ 6 - 0
applications/tasks/task_handler.py

@@ -213,5 +213,11 @@ class TaskHandler(TaskMapper):
         await task.crawl_hot_titles()
         return self.TASK_SUCCESS_STATUS
 
+    # 异步处理热点事件
+    async def _analysis_hot_point_handler(self) -> int:
+        task = CrawlerHotPointTask(self.db_client, self.log_client, self.trace_id)
+        await task.classify_articles_by_llm()
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -197,6 +197,8 @@ class TaskScheduler(TaskHandler):
             "mini_program_detail_process": self._mini_program_detail_handler,
             # 热点事件抓取
             "crawler_hot_point": self._crawl_hot_point_handler,
+            # 热点事件判断
+            "analysis_hot_point": self._analysis_hot_point_handler
         }
 
         if task_name not in handlers: