Pārlūkot izejas kodu

新增公众号搜索抓取模式

luojunhui 1 mēnesi atpakaļ
vecāks
revīzija
afe4deba4d

+ 51 - 44
applications/crawler/wechat/gzh_spider.py

@@ -18,7 +18,7 @@ headers = {"Content-Type": "application/json"}
 
 
 @retry(**retry_desc)
-def get_article_detail(
+async def get_article_detail(
     article_link: str, is_count: bool = False, is_cache: bool = True
 ) -> dict | None:
     """
@@ -33,54 +33,61 @@ def get_article_detail(
             "is_cache": is_cache,
         }
     )
-    try:
-        response = requests.post(
-            url=target_url, headers=headers, data=payload, timeout=120
-        )
-        response.raise_for_status()
-        return response.json()
-    except requests.exceptions.RequestException as e:
-        log(
-            task="get_official_article_detail",
-            function="get_official_article_detail",
-            message=f"API请求失败: {e}",
-            data={"link": article_link},
-        )
-    except json.JSONDecodeError as e:
-        log(
-            task="get_official_article_detail",
-            function="get_official_article_detail",
-            message=f"响应解析失败: {e}",
-            data={"link": article_link},
-        )
-    return None
+    async with AsyncHttpClient(timeout=10) as http_client:
+        response = await http_client.post(target_url, headers=headers, data=payload)
+
+    return response
+    # try:
+    #     response = requests.post(
+    #         url=target_url, headers=headers, data=payload, timeout=120
+    #     )
+    #     response.raise_for_status()
+    #     return response.json()
+    # except requests.exceptions.RequestException as e:
+    #     log(
+    #         task="get_official_article_detail",
+    #         function="get_official_article_detail",
+    #         message=f"API请求失败: {e}",
+    #         data={"link": article_link},
+    #     )
+    # except json.JSONDecodeError as e:
+    #     log(
+    #         task="get_official_article_detail",
+    #         function="get_official_article_detail",
+    #         message=f"响应解析失败: {e}",
+    #         data={"link": article_link},
+    #     )
+    # return None
 
 
 @retry(**retry_desc)
-def get_article_list_from_account(account_id: str, index=None) -> dict | None:
+async def get_article_list_from_account(account_id: str, index=None) -> dict | None:
     target_url = f"{base_url}/blogger"
     payload = json.dumps({"account_id": account_id, "cursor": index})
-    try:
-        response = requests.post(
-            url=target_url, headers=headers, data=payload, timeout=120
-        )
-        response.raise_for_status()
-        return response.json()
-    except requests.exceptions.RequestException as e:
-        log(
-            task="get_official_account_article_list",
-            function="get_official_account_article_list",
-            message=f"API请求失败: {e}",
-            data={"gh_id": account_id},
-        )
-    except json.JSONDecodeError as e:
-        log(
-            task="get_official_account_article_list",
-            function="get_official_account_article_list",
-            message=f"响应解析失败: {e}",
-            data={"gh_id": account_id},
-        )
-    return None
+    async with AsyncHttpClient(timeout=120) as http_client:
+        response = await http_client.post(target_url, headers=headers, data=payload)
+    return response
+    # try:
+    #     response = requests.post(
+    #         url=target_url, headers=headers, data=payload, timeout=120
+    #     )
+    #     response.raise_for_status()
+    #     return response.json()
+    # except requests.exceptions.RequestException as e:
+    #     log(
+    #         task="get_official_account_article_list",
+    #         function="get_official_account_article_list",
+    #         message=f"API请求失败: {e}",
+    #         data={"gh_id": account_id},
+    #     )
+    # except json.JSONDecodeError as e:
+    #     log(
+    #         task="get_official_account_article_list",
+    #         function="get_official_account_article_list",
+    #         message=f"响应解析失败: {e}",
+    #         data={"gh_id": account_id},
+    #     )
+    # return None
 
 
 @retry(**retry_desc)

+ 7 - 1
applications/tasks/crawler_tasks/__init__.py

@@ -1,5 +1,11 @@
 from .crawler_toutiao import CrawlerToutiao
 from .crawler_account_manager import WeixinAccountManager
 from .crawler_gzh import CrawlerGzhAccountArticles
+from .crawler_gzh import CrawlerGzhSearchArticles
 
-__all__ = ["CrawlerToutiao", "WeixinAccountManager", "CrawlerGzhAccountArticles"]
+__all__ = [
+    "CrawlerToutiao",
+    "WeixinAccountManager",
+    "CrawlerGzhAccountArticles",
+    "CrawlerGzhSearchArticles"
+]

+ 105 - 19
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -4,7 +4,7 @@ import asyncio
 import json
 import time
 import traceback
-from datetime import datetime, date
+from datetime import datetime, date, timedelta
 from typing import List, Dict
 
 from applications.api import feishu_robot
@@ -21,7 +21,11 @@ class CrawlerGzhConst:
     DEFAULT_VIEW_COUNT = 0
     DEFAULT_LIKE_COUNT = 0
     DEFAULT_ARTICLE_STATUS = 1
-    STAT_DURATION = 30 # days
+    MAX_DEPTH = 3
+    #
+    SLEEP_SECONDS = 1
+
+    STAT_DURATION = 30  # days
     DEFAULT_TIMESTAMP = 1735660800
 
 
@@ -59,13 +63,14 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
         return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
 
     async def crawl_each_article(
-        self, article_raw_data, mode, account_method, account_id
+        self, article_raw_data, mode, account_method, account_id, source_title=None
     ):
         """crawl each article"""
         base_item = {
             "platform": self.PLATFORM,
             "mode": mode,
             "crawler_time": int(time.time()),
+            "category": account_method,
         }
         match mode:
             case "account":
@@ -83,7 +88,6 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                     "read_cnt": show_view_count,
                     "like_cnt": show_like_count,
                     "title": article_raw_data["Title"],
-                    "category": account_method,
                     "out_account_id": account_id,
                     "article_index": article_raw_data["ItemIndex"],
                     "link": article_raw_data["ContentUrl"],
@@ -91,12 +95,30 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                     "unique_index": unique_idx,
                     "publish_time": article_raw_data["send_time"],
                 }
+            case "search":
+                new_item = {
+                    **base_item,
+                    "out_account_id": account_id,
+                    "article_index": article_raw_data["item_index"],
+                    "title": article_raw_data["title"],
+                    "link": article_raw_data["content_link"],
+                    "like_cnt": article_raw_data.get(
+                        "like_count", self.DEFAULT_LIKE_COUNT
+                    ),
+                    "read_cnt": article_raw_data.get(
+                        "view_count", self.DEFAULT_VIEW_COUNT
+                    ),
+                    "publish_time": int(article_raw_data["publish_timestamp"] / 1000),
+                    "unique_index": generate_gzh_id(article_raw_data["content_link"]),
+                    "source_article_title": source_title,
+                }
             case _:
                 raise Exception(f"unknown mode: {mode}")
 
         await self.save_item_to_database(
             media_type="article", item=new_item, trace_id=self.trace_id
         )
+        await asyncio.sleep(self.STAT_DURATION)
 
     async def update_account_read_avg_info(self, gh_id, account_name):
         """update account read avg info"""
@@ -139,7 +161,32 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                         set status = %s 
                         where gh_id = %s and position = %s and dt < %s;
                     """
-                    await self.pool.async_save(update_query, (0, gh_id, position, today_dt))
+                    await self.pool.async_save(
+                        update_query, (0, gh_id, position, today_dt)
+                    )
+
+    async def get_hot_titles_with_strategy(self, strategy):
+        """get hot titles with strategy"""
+        match strategy:
+            case "V1":
+                position = 3
+                read_times_threshold = 1.21
+                timedelta_days = 3
+            case "V2":
+                position = 2
+                read_times_threshold = 1.1
+                timedelta_days = 5
+            case _:
+                raise Exception(f"unknown strategy: {strategy}")
+        date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
+            "%Y-%m-%d"
+        )
+        return await get_hot_titles(
+            self.pool,
+            date_string=date_string,
+            position=position,
+            read_times_threshold=read_times_threshold,
+        )
 
 
 class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
@@ -175,7 +222,7 @@ class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
         latest_timestamp = account["latest_update_time"].timestamp()
         while True:
             # fetch response from weixin
-            response = get_article_list_from_account(
+            response = await get_article_list_from_account(
                 account_id=gh_id, index=current_cursor
             )
             msg_list = response.get("data", {}).get("data")
@@ -227,22 +274,61 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
-    async def search_each_title(self, title: str, page='1') -> None:
-        """search in weixin"""
-        search_response = await weixin_search(keyword=title, page=page)
+    async def crawl_search_articles_detail(
+        self, article_list: List[Dict], source_title: str
+    ):
+        for article in article_list:
+            url = article["url"]
+            detail_response = await get_article_detail(url, is_count=True, is_cache=False)
+            if not detail_response:
+                continue
 
+            article_data = detail_response.get("data")
+            if not article_data:
+                continue
 
+            if type(article_data) is not dict:
+                continue
 
-    async def deal(self, date_string: str, strategy: str = "V1"):
-        hot_titles = await get_hot_titles(self.pool, date_string=date_string)
-        for hot_title in hot_titles:
-            await self.search_each_title(hot_title)
-#
-#
-# if __name__ == "__main__":
-#     import asyncio
-#     response = asyncio.run(weixin_search(keyword="南京照相馆"))
-#     print(json.dumps(response, ensure_ascii=False, indent=4))
+            article_detail = article_data.get("data")
+            if not article_detail:
+                continue
 
+            await self.crawl_each_article(
+                article_raw_data=article_detail,
+                mode="search",
+                account_method="search",
+                account_id="search",
+                source_title=source_title,
+            )
+            await asyncio.sleep(self.SLEEP_SECONDS)
+
+    async def search_each_title(self, title: str, page: str = "1") -> None:
+        """search in weixin"""
+        current_page = page
+        while True:
+            # 翻页不超过3页
+            if int(current_page) > self.MAX_DEPTH:
+                break
+            # 调用搜索接口
+            search_response = await weixin_search(keyword=title, page=page)
+            if not search_response:
+                break
 
+            article_list = search_response.get("data", {}).get("data")
+            if not article_list:
+                break
+            # 存储搜索结果
+            await self.crawl_search_articles_detail(article_list, title)
+            # 判断是否还有下一页
+            has_more = search_response.get("data", {}).get("has_more")
+            if not has_more:
+                break
+            # 更新page
+            current_page = search_response.get("data", {}).get("next_cursor")
 
+    async def deal(self, strategy: str = "V1"):
+        hot_titles = await self.get_hot_titles_with_strategy(strategy)
+        for hot_title in hot_titles:
+            print("hot title:", hot_title)
+            await self.search_each_title(hot_title)

+ 2 - 2
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -102,7 +102,7 @@ class RecycleDailyPublishArticlesTask(Const):
 
         cursor = None
         while True:
-            response = get_article_list_from_account(
+            response = await get_article_list_from_account(
                 account_id=account["gh_id"], index=cursor
             )
             response_code = response["code"]
@@ -281,7 +281,7 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
         url = article["ContentUrl"]
         wx_sn = article["wx_sn"].decode("utf-8")
         try:
-            response = get_article_detail(url)
+            response = await get_article_detail(url)
             response_code = response["code"]
 
             if response_code == self.ARTICLE_DELETE_CODE:

+ 4 - 4
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -93,7 +93,7 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
         if await self.whether_published_in_a_week(gh_id):
             return
 
-        fetch_response = get_article_list_from_account(gh_id)
+        fetch_response = await get_article_list_from_account(gh_id)
         try:
             msg_list = fetch_response.get("data", {}).get("data", [])
             if msg_list:
@@ -120,7 +120,7 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
         # insert each article
         for article in detail_info:
             link = article["ContentUrl"]
-            article_detail = get_article_detail(link)
+            article_detail = await get_article_detail(link)
             response_code = article_detail["code"]
             if response_code == self.ARTICLE_ILLEGAL_CODE:
                 illegal_reason = article_detail.get("msg")
@@ -195,7 +195,7 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
         check each article
         """
         link = article["link"]
-        article_detail = get_article_detail(link)
+        article_detail = await get_article_detail(link)
         response_code = article_detail["code"]
         if response_code == self.ARTICLE_ILLEGAL_CODE:
             illegal_reason = article_detail.get("msg")
@@ -279,7 +279,7 @@ class InnerGzhArticlesMonitor(MonitorConst):
     async def check_each_article(self, article: dict):
         gh_id, account_name, title, url, wx_sn, publish_date = article
         try:
-            response = get_article_detail(url, is_cache=False)
+            response = await get_article_detail(url, is_cache=False)
             response_code = response["code"]
             if response_code == self.ARTICLE_ILLEGAL_CODE:
                 error_detail = article.get("msg")

+ 4 - 0
applications/tasks/task_handler.py

@@ -4,6 +4,7 @@ from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.crawler_tasks import WeixinAccountManager
 from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
+from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
@@ -145,6 +146,9 @@ class TaskHandler(TaskMapper):
             case "account":
                 task = CrawlerGzhAccountArticles(self.db_client, self.log_client, self.trace_id)
                 await task.deal(account_method, strategy)
+            case "search":
+                task = CrawlerGzhSearchArticles(self.db_client, self.log_client, self.trace_id)
+                await task.deal(strategy)
             case _:
                 raise ValueError(f"Unsupported crawl mode {crawl_mode}")
         return self.TASK_SUCCESS_STATUS

+ 2 - 2
applications/utils/async_mysql_utils.py

@@ -10,13 +10,13 @@ async def get_top_article_title_list(pool) -> List[Dict]:
     return await pool.async_fetch(query=query, params=("TOP100",))
 
 
-async def get_hot_titles(pool, date_string) -> List[str]:
+async def get_hot_titles(pool, date_string, position, read_times_threshold) -> List[str]:
     """get titles of hot articles"""
     query = """
         select distinct title
         from datastat_sort_strategy
         where position < %s and read_rate >= %s and date_str >= %s;
     """
-    response = await pool.async_fetch(query=query, params=(3, 1.21, date_string))
+    response = await pool.async_fetch(query=query, params=(position, read_times_threshold, date_string))
     return [i['title'] for i in response]