Browse Source

gzh抓取优化

luojunhui 3 weeks ago
parent
commit
d9271e02c3
1 changed files with 44 additions and 33 deletions
  1. 44 33
      applications/tasks/crawler_tasks/crawler_gzh.py

+ 44 - 33
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -1,10 +1,11 @@
 from __future__ import annotations
 
 import asyncio
-import time
+import time, json
 import traceback
 from datetime import datetime, date, timedelta
 from typing import List, Dict
+from tqdm.asyncio import tqdm
 
 from applications.api import feishu_robot
 from applications.crawler.wechat import weixin_search
@@ -27,6 +28,14 @@ class CrawlerGzhConst:
     STAT_DURATION = 30  # days
     DEFAULT_TIMESTAMP = 1735660800
 
+    DAILY_SCRAPE_POSTIVE = 1
+    DAILY_SCRAPE_NEGATIVE = 0
+
+    USING_STATUS = 1
+    NOT_USING_STATUS = 0
+
+    CRAWL_ACCOUNT_FIRST_LEVEL = 500
+
 
 class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
     def __init__(self, pool, log_client, trace_id):
@@ -42,24 +51,32 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                     from long_articles_accounts
                     where account_category = %s and is_using = %s and daily_scrape = %s;
                 """
-                return await self.pool.async_fetch(query=query, params=(method, 1, 1))
+                return await self.pool.async_fetch(
+                    query=query, params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE)
+                )
+
             case "V2":
                 query = """
                     select gh_id, account_name, latest_update_time
                     from long_articles_accounts
-                    where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s; 
+                    where account_category = %s and is_using = %s 
+                    order by recent_score_ci_lower desc limit %s; 
                 """
-                return await self.pool.async_fetch(query=query, params=(method, 1, 500))
+                return await self.pool.async_fetch(
+                    query=query, params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL)
+                )
+
             case _:
                 raise Exception("strategy not supported")
 
     async def get_account_latest_update_timestamp(self, account_id: str) -> int:
         """get latest update time"""
-        query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;"""
-        latest_timestamp_obj = await self.pool.async_fetch(
-            query=query, params=(account_id,)
-        )
-        return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
+        query = """
+            select max(publish_time) as publish_time 
+            from crawler_meta_article where out_account_id = %s;
+        """
+        fetch_response = await self.pool.async_fetch(query=query, params=(account_id,))
+        return next((item.get("publish_time") for item in fetch_response or []), None)
 
     async def crawl_each_article(
         self, article_raw_data, mode, account_method, account_id, source_title=None
@@ -117,7 +134,7 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
         await self.save_item_to_database(
             media_type="article", item=new_item, trace_id=self.trace_id
         )
-        await asyncio.sleep(self.STAT_DURATION)
+        await asyncio.sleep(self.SLEEP_SECONDS)
 
     async def update_account_read_avg_info(self, gh_id, account_name):
         """update account read avg info"""
@@ -136,7 +153,7 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                 read_avg = sum(read_cnt_list) / n
                 max_publish_dt = fetch_response[0]["publish_dt"]
                 remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
-                insert_query = f"""
+                insert_query = """
                     insert ignore into crawler_meta_article_accounts_read_avg
                     (gh_id, account_name, position, read_avg, dt, status, remark)
                     values
@@ -144,24 +161,16 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                 """
                 insert_rows = await self.pool.async_save(
                     query=insert_query,
-                    params=(
-                        gh_id,
-                        account_name,
-                        position,
-                        read_avg,
-                        today_dt,
-                        1,
-                        remark,
-                    ),
+                    params=(gh_id, account_name, position, read_avg, today_dt, self.USING_STATUS, remark),
                 )
                 if insert_rows:
-                    update_query = f"""
+                    update_query = """
                         update crawler_meta_article_accounts_read_avg
                         set status = %s 
                         where gh_id = %s and position = %s and dt < %s;
                     """
                     await self.pool.async_save(
-                        update_query, (0, gh_id, position, today_dt)
+                        update_query, (self.NOT_USING_STATUS, gh_id, position, today_dt)
                     )
 
     async def get_hot_titles_with_strategy(self, strategy):
@@ -177,6 +186,7 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                 timedelta_days = 5
             case _:
                 raise Exception(f"unknown strategy: {strategy}")
+
         date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
             "%Y%m%d"
         )
@@ -247,8 +257,8 @@ class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
 
     async def deal(self, method: str, strategy: str = "V1"):
         account_list = await self.get_crawler_accounts(method, strategy)
-        for account in account_list:
-            print(account)
+        for account in tqdm(account_list, desc="抓取单个账号"):
+            print(f"{datetime.now()}: start crawling account: {json.dumps(account, ensure_ascii=False)}")
             try:
                 await self.crawler_single_account(method, account)
                 await self.update_account_read_avg_info(
@@ -267,20 +277,20 @@ class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
                         },
                     }
                 )
+            print(f"{datetime.now()}: finish crawled account: {json.dumps(account, ensure_ascii=False)}")
 
 
 class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
-    async def crawl_search_articles_detail(
-        self, article_list: List[Dict], source_title: str
-    ):
+    async def crawl_search_articles_detail(self, article_list: List[Dict], source_title: str):
+        """
+        @description: 对于搜索到的文章list,获取文章详情, 并且存储到meta表中
+        """
         for article in article_list:
             url = article["url"]
-            detail_response = await get_article_detail(
-                url, is_count=True, is_cache=False
-            )
+            detail_response = await get_article_detail(url, is_count=True, is_cache=False)
             if not detail_response:
                 continue
 
@@ -335,13 +345,14 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
 
     async def deal(self, strategy: str = "V1"):
         hot_titles = await self.get_hot_titles_with_strategy(strategy)
-        for hot_title in hot_titles:
-            print("hot title:", hot_title)
+        for hot_title in tqdm(hot_titles, desc="在微信内搜索文章"):
+            print(f"{datetime.now()}: start searching hot title: {hot_title}")
             try:
                 await self.search_each_title(hot_title)
             except Exception as e:
-                print(f"crawler_gzh_articles error:{e}")
+                print(f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}")
 
+            print(f"{datetime.now()}: finish searched hot title: {hot_title}")
         await feishu_robot.bot(
             title="公众号搜索任务执行完成",
             detail={