luojunhui 2 týždňov pred
rodič
commit
f1d425b5a5

+ 4 - 0
applications/crawler/tophub/__init__.py

@@ -0,0 +1,4 @@
+from .hot_point import get_hot_point_content
+
+
+__all__ = ["get_hot_point_content"]

+ 44 - 0
applications/crawler/tophub/hot_point.py

@@ -0,0 +1,44 @@
+import aiohttp
+import asyncio
+
+
+async def get_hot_point_content(page_index=1):
+    """
+    获取今日热榜内容排名(异步版本)
+    """
+    url = "http://crawapi.piaoquantv.com/crawler/jin_ri_re_bang/content_rank"
+
+    headers = {"Content-Type": "application/json"}
+
+    data = {"sort_type": "最热", "category": "news", "cursor": page_index}
+
+    try:
+        async with aiohttp.ClientSession() as session:
+            async with session.post(url, headers=headers, json=data) as response:
+                response.raise_for_status()  # 检查请求是否成功
+
+                # 返回JSON响应
+                return await response.json()
+
+    except aiohttp.ClientError as e:
+        print(f"请求失败: {e}")
+        return None
+    except Exception as e:
+        print(f"其他错误: {e}")
+        return None
+
+
+async def get_hot_point_content_batch(page_indices):
+    """
+    批量获取热榜内容(异步并发)
+    """
+    tasks = [get_hot_point_content(page_index) for page_index in page_indices]
+    results = await asyncio.gather(*tasks, return_exceptions=True)
+
+    # 过滤掉异常结果
+    valid_results = []
+    for result in results:
+        if not isinstance(result, Exception) and result is not None:
+            valid_results.append(result)
+
+    return valid_results

+ 3 - 0
applications/tasks/crawler_tasks/__init__.py

@@ -2,10 +2,13 @@ from .crawler_toutiao import CrawlerToutiao
 from .crawler_account_manager import WeixinAccountManager
 from .crawler_gzh import CrawlerGzhAccountArticles
 from .crawler_gzh import CrawlerGzhSearchArticles
+from .crawler_hot_point import CrawlerHotPointTask
+
 
 __all__ = [
     "CrawlerToutiao",
     "WeixinAccountManager",
     "CrawlerGzhAccountArticles",
     "CrawlerGzhSearchArticles",
+    "CrawlerHotPointTask",
 ]

+ 0 - 1
applications/tasks/crawler_tasks/crawler_account_manager.py

@@ -49,7 +49,6 @@ class CrawlerAccountManager(CrawlerAccountManagerConst):
 
 
 class WeixinAccountManager(CrawlerAccountManager):
-
     def __init__(self, pool, aliyun_log, trace_id):
         super().__init__(pool, aliyun_log, trace_id)
         self.pool = pool

+ 102 - 0
applications/tasks/crawler_tasks/crawler_hot_point.py

@@ -0,0 +1,102 @@
+from __future__ import annotations
+
+import asyncio
+import json
+from typing import Dict, List, Tuple
+from tqdm.asyncio import tqdm
+
+from applications.crawler.tophub import get_hot_point_content
+
+
+class CrawlerHotPointConst:
+    max_page_index = 40
+
+    init_status = 0
+    processing_status = 1
+    useful_status = 2
+    unuseful_status = 3
+    failed_status = 99
+
+    not_expired_status = 1
+    expired_status = 2
+
+
+class CrawlerHotPointMapper(CrawlerHotPointConst):
+    def __init__(self, pool, log_client, trace_id):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    async def save_articles(self, articles: List[Tuple]):
+        """插入标题 && Link"""
+        query = """
+            INSERT INTO hot_point_titles
+                (title, platform, link)
+            VALUES (%s, %s, %s);
+        """
+        return await self.pool.async_save(query=query, params=articles, batch=True)
+
+    async def update_useful_status(
+        self, article_id: int, origin_status: int, new_status: int
+    ):
+        """
+        更新文章状态
+        """
+        query = """
+            UPDATE hot_point_titles
+            SET useful = %s
+            WHERE id = %s AND useful = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, article_id, origin_status)
+        )
+
+    async def set_as_expired(self, article_id: int):
+        """
+        设置文章为过期
+        """
+        query = """
+            UPDATE hot_point_titles
+            SET status = %s
+            WHERE id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(self.expired_status, article_id)
+        )
+
+
+class CrawlerHotPointTask(CrawlerHotPointMapper):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client, trace_id)
+
+    @staticmethod
+    def process_raw_data(response_data):
+        """
+        处理原始数据
+        """
+        articles = []
+        for item in response_data:
+            platform = item["source"]
+            for article in item["rankList"]:
+                title = article["title"]
+                link = article["link"]
+                articles.append((title, platform, link))
+        return articles
+
+    async def crawl_hot_titles(self):
+        """
+        爬取热点标题
+        """
+        for page in tqdm(range(1, self.max_page_index)):
+            try:
+                raw_data = await get_hot_point_content(page_index=page)
+                articles = self.process_raw_data(raw_data)
+                await self.save_articles(articles)
+            except Exception as e:
+                print(f"crawl_hot_titles error: {e}")
+
+    async def classify_articles_by_llm(self):
+        """
+        用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好
+        """
+        pass

+ 7 - 0
applications/tasks/task_handler.py

@@ -10,6 +10,7 @@ from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.crawler_tasks import WeixinAccountManager
 from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
 from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
+from applications.tasks.crawler_tasks import CrawlerHotPointTask
 
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
@@ -206,5 +207,11 @@ class TaskHandler(TaskMapper):
         await task.deal(params=self.data)
         return self.TASK_SUCCESS_STATUS
 
+    # 执行热点事件抓取任务
+    async def _crawl_hot_point_handler(self) -> int:
+        task = CrawlerHotPointTask(self.db_client, self.log_client, self.trace_id)
+        await task.crawl_hot_titles()
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -195,6 +195,8 @@ class TaskScheduler(TaskHandler):
             "crawler_detail_analysis": self._crawler_article_analysis_handler,
             # 小程序裂变信息处理
             "mini_program_detail_process": self._mini_program_detail_handler,
+            # 热点事件抓取
+            "crawler_hot_point": self._crawl_hot_point_handler,
         }
 
         if task_name not in handlers: