Bläddra i källkod

新增头条抓取

luojunhui 1 månad sedan
förälder
incheckning
58f5e1828c

+ 2 - 1
applications/tasks/crawler_tasks/__init__.py

@@ -1,4 +1,5 @@
 from .crawler_toutiao import CrawlerToutiao
 from .crawler_account_manager import WeixinAccountManager
+from .crawler_gzh import CrawlerGzhAccountArticles
 
-__all__ = ["CrawlerToutiao", "WeixinAccountManager"]
+__all__ = ["CrawlerToutiao", "WeixinAccountManager", "CrawlerGzhAccountArticles"]

+ 0 - 1
applications/tasks/crawler_tasks/crawler_account_manager.py

@@ -146,4 +146,3 @@ class WeixinAccountManager(CrawlerAccountManager):
 
         for account_id in tqdm(account_id_list):
             await self.analysis_single_account(account_id)
-

+ 188 - 0
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -0,0 +1,188 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import time
+import traceback
+from datetime import datetime
+from typing import List, Dict
+
+from applications.api import feishu_robot
+from applications.crawler.wechat import search
+from applications.crawler.wechat import get_article_detail
+from applications.crawler.wechat import get_article_list_from_account
+from applications.pipeline import CrawlerPipeline
+from applications.utils import timestamp_to_str, show_desc_to_sta, generate_gzh_id
+
+
+class CrawlerGzhConst:
+    PLATFORM = "weixin"
+    DEFAULT_VIEW_COUNT = 0
+    DEFAULT_LIKE_COUNT = 0
+    DEFAULT_ARTICLE_STATUS = 1
+    DEFAULT_TIMESTAMP = 1735660800
+
+
+class CrawlerGzhStrategy(CrawlerPipeline, CrawlerGzhConst):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client)
+        self.trace_id = trace_id
+
+    async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
+        """get crawler accounts"""
+        match strategy:
+            case "V1":
+                query = """
+                    select gh_id, account_name, latest_update_time
+                    from long_articles_accounts
+                    where account_category = %s and is_using = %s and daily_scrape = %s;
+                """
+                return await self.pool.async_fetch(query=query, params=(method, 1, 1))
+            case "V2":
+                query = """
+                    select gh_id, account_name, latest_update_time
+                    from long_articles_accounts
+                    where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s; 
+                """
+                return await self.pool.async_fetch(
+                    query=query, params=(method, 1, 100)
+                )
+            case _:
+                raise Exception("strategy not supported")
+
+    async def get_account_latest_update_timestamp(self, account_id: str) -> int:
+        """get latest update time"""
+        query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;"""
+        latest_timestamp_obj = await self.pool.async_fetch(
+            query=query, params=(account_id,)
+        )
+        return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
+
+    async def crawl_each_article(
+        self, article_raw_data, mode, account_method, account_id
+    ):
+        """crawl each article"""
+        base_item = {
+            "platform": self.PLATFORM,
+            "mode": mode,
+            "crawler_time": int(time.time()),
+        }
+        match mode:
+            case "account":
+                show_stat = show_desc_to_sta(article_raw_data["ShowDesc"])
+                show_view_count = show_stat.get(
+                    "show_view_count", self.DEFAULT_VIEW_COUNT
+                )
+                show_like_count = show_stat.get(
+                    "show_like_count", self.DEFAULT_LIKE_COUNT
+                )
+                unique_idx = generate_gzh_id(article_raw_data["ContentUrl"])
+
+                new_item = {
+                    **base_item,
+                    "read_cnt": show_view_count,
+                    "like_cnt": show_like_count,
+                    "title": article_raw_data["Title"],
+                    "category": account_method,
+                    "out_account_id": account_id,
+                    "article_index": article_raw_data["ItemIndex"],
+                    "link": article_raw_data["ContentUrl"],
+                    "description": article_raw_data["Digest"],
+                    "unique_index": unique_idx,
+                    "publish_time": article_raw_data["send_time"],
+                }
+            case _:
+                raise Exception(f"unknown mode: {mode}")
+
+        await self.save_item_to_database(
+            media_type="article", item=new_item, trace_id=self.trace_id
+        )
+
+
+class CrawlerGzhAccountArticles(CrawlerGzhStrategy):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client, trace_id)
+
+    async def insert_article_into_meta(self, gh_id, account_method, msg_list):
+        """
+        将数据更新到数据库
+        :return:
+        """
+        for msg in msg_list:
+            article_list = msg["AppMsg"]["DetailInfo"]
+            for obj in article_list:
+                await self.crawl_each_article(
+                    article_raw_data=obj,
+                    mode="account",
+                    account_method=account_method,
+                    account_id=gh_id,
+                )
+
+    async def update_account_latest_timestamp(self, gh_id):
+        """update the latest timestamp after crawler"""
+        latest_timestamp = await self.get_account_latest_update_timestamp(gh_id)
+        dt_str = timestamp_to_str(latest_timestamp)
+        query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;"""
+        await self.pool.async_save(query=query, params=(dt_str, gh_id))
+
+    async def crawler_single_account(self, account_method: str, account: Dict) -> None:
+        """crawler single account"""
+        current_cursor = None
+        gh_id = account["gh_id"]
+        latest_timestamp = account["latest_update_time"].timestamp()
+        while True:
+            # fetch response from weixin
+            response = get_article_list_from_account(
+                account_id=gh_id, index=current_cursor
+            )
+            msg_list = response.get("data", {}).get("data")
+            if not msg_list:
+                break
+
+            # process current page
+            await self.insert_article_into_meta(gh_id, account_method, msg_list)
+
+            # whether crawl next page
+            last_article_in_this_page = msg_list[-1]
+            last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
+                "BaseInfo"
+            ]["UpdateTime"]
+            if last_time_stamp_in_this_msg > latest_timestamp:
+                await self.update_account_latest_timestamp(gh_id)
+                break
+
+            # update cursor for next page
+            current_cursor = response.get("data", {}).get("next_cursor")
+            if not current_cursor:
+                break
+
+    async def deal(self, method: str, strategy: str = "V1"):
+        account_list = await self.get_crawler_accounts(method, strategy)
+        for account in account_list:
+            print(account)
+            try:
+                await self.crawler_single_account(method, account)
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_gzh_articles",
+                        "trace_id": account["trace_id"],
+                        "data": {
+                            "account_id": account["account_id"],
+                            "account_method": method,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        }
+                    }
+                )
+
+
+class CrawlerGzhSearchArticles(CrawlerGzhStrategy):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client, trace_id)
+
+    async def deal(self):
+        return {
+            "mode": "search",
+            "message": "still developing"
+        }

+ 13 - 0
applications/tasks/task_handler.py

@@ -3,6 +3,7 @@ from datetime import datetime
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.crawler_tasks import WeixinAccountManager
+from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
@@ -135,3 +136,15 @@ class TaskHandler(TaskMapper):
 
         await task.deal(platform=platform, account_id_list=account_id_list)
         return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_gzh_article_handler(self) -> int:
+        account_method = self.data.get("account_method")
+        crawl_mode = self.data.get("crawl_mode")
+        strategy = self.data.get("strategy")
+        match crawl_mode:
+            case "account":
+                task = CrawlerGzhAccountArticles(self.db_client, self.log_client, self.trace_id)
+                await task.deal(account_method, strategy)
+            case _:
+                raise ValueError(f"Unsupported crawl mode {crawl_mode}")
+        return self.TASK_SUCCESS_STATUS

+ 3 - 1
applications/tasks/task_scheduler.py

@@ -182,7 +182,9 @@ class TaskScheduler(TaskHandler):
             # 文章内容池--标题品类处理
             "article_pool_category_generation": self._article_pool_category_generation_handler,
             # 抓取账号管理
-            "crawler_account_manager": self._crawler_account_manager_handler
+            "crawler_account_manager": self._crawler_account_manager_handler,
+            # 微信公众号文章抓取
+            "crawler_gzh_articles": self._crawler_gzh_article_handler
         }
 
         if task_name not in handlers:

+ 10 - 3
applications/utils/async_mysql_utils.py

@@ -2,7 +2,7 @@ from typing import List, Dict
 
 
 async def get_top_article_title_list(pool) -> List[Dict]:
-    query = f"""
+    query = """
         select distinct title, source_id
         from datastat_sort_strategy
         where produce_plan_name = %s and source_id is not null;
@@ -10,5 +10,12 @@ async def get_top_article_title_list(pool) -> List[Dict]:
     return await pool.async_fetch(query=query, params=("TOP100",))
 
 
-async def get():
-    pass
+async def get_hot_titles(pool, date_string) -> List[Dict]:
+    """get titles of hot articles"""
+    query = """
+        select distinct title
+        from datastat_sort_strategy
+        where position < %s and read_rate >= %s and date_str >= %s;
+    """
+    return await pool.async_fetch(query=query, params=(3, 1.21, date_string))
+

+ 2 - 4
applications/utils/item.py

@@ -15,16 +15,14 @@ class CrawlerMetaArticle(BaseModel):
         default=..., description="抓取类型:最初设计不合理,积重难返,实际与品类无关"
     )
     out_account_id: str = Field(default=..., description="抓取账号账号id")
-    article_index: str = Field(
+    article_index: int = Field(
         default=None, description="群发发文位置,常见于微信公众号"
     )
     title: str = Field(default=..., description="文章标题")
     link: str = Field(default=..., description="文章链接")
     read_cnt: int = Field(default=0, description="阅读量")
     like_cnt: int = Field(default=0, description="点赞量")
-    description: Optional[str] = Field(
-        default=None, max_length=255, description="文章简介"
-    )
+    description: Optional[str] = Field(default=None, description="文章简介")
     publish_time: int = Field(default=None, description="文章发布时间")
     crawler_time: int = Field(default=None, description="抓取时间")
     score: float = Field(default=None, description="相似度分")