Browse Source

新增公众号抓取

luojunhui 1 tháng trước cách đây
mục cha
commit
be2531e2c7

+ 7 - 3
applications/crawler/wechat/gzh_spider.py

@@ -8,6 +8,7 @@ from tenacity import retry
 
 from applications.api import log
 from applications.utils import request_retry
+from applications.utils import AsyncHttpClient
 
 retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
 
@@ -124,8 +125,11 @@ def get_source_account_from_article(article_link) -> dict | None:
 
 
 @retry(**retry_desc)
-def search(keyword: str, page="1") -> dict | None:
+async def weixin_search(keyword: str, page="1") -> dict | None:
     url = "{}/keyword".format(base_url)
     payload = json.dumps({"keyword": keyword, "cursor": page})
-    response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
-    return response.json()
+    # response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
+    async with AsyncHttpClient(timeout=120) as http_client:
+        response = await http_client.post(url=url, headers=headers, data=payload)
+
+    return response

+ 26 - 11
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -8,11 +8,12 @@ from datetime import datetime, date
 from typing import List, Dict
 
 from applications.api import feishu_robot
-from applications.crawler.wechat import search
+from applications.crawler.wechat import weixin_search
 from applications.crawler.wechat import get_article_detail
 from applications.crawler.wechat import get_article_list_from_account
 from applications.pipeline import CrawlerPipeline
-from applications.utils import timestamp_to_str, show_desc_to_sta, generate_gzh_id
+from applications.utils import timestamp_to_str, show_desc_to_sta
+from applications.utils import get_hot_titles, generate_gzh_id
 
 
 class CrawlerGzhConst:
@@ -102,14 +103,12 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
         position_list = [i for i in range(1, 9)]
         today_dt = date.today().isoformat()
         for position in position_list:
-            query = """
-                select read_cnt, from_unixtime(publish_time, '%Y-%m_%d') as publish_dt from crawler_meta_article
-                where out_account_id = %s and article_index = %s
-                order by publish_time desc limit %s;
+            query = f"""
+                select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
+                where out_account_id = '{gh_id}' and article_index = {position}
+                order by publish_time desc limit {self.STAT_DURATION};
             """
-            fetch_response = await self.pool.async_fetch(
-                query=query, params=(gh_id, position, self.STAT_DURATION)
-            )
+            fetch_response = await self.pool.async_fetch(query=query)
             if fetch_response:
                 read_cnt_list = [i["read_cnt"] for i in fetch_response]
                 n = len(read_cnt_list)
@@ -228,5 +227,21 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
-    async def deal(self):
-        return {"mode": "search", "message": "still developing"}
+    async def search_each_title(self, title: str) -> None:
+        """search in weixin"""
+        search_response = await weixin_search(keyword=title)
+        print(search_response)
+
+    async def deal(self, date_string: str, strategy: str = "V1"):
+        hot_titles = await get_hot_titles(self.pool, date_string=date_string)
+        for hot_title in hot_titles:
+            await self.search_each_title(hot_title)
+#
+#
+# if __name__ == "__main__":
+#     import asyncio
+#     response = asyncio.run(weixin_search(keyword="南京照相馆"))
+#     print(json.dumps(response, ensure_ascii=False, indent=4))
+#
+#
+

+ 3 - 2
applications/utils/async_mysql_utils.py

@@ -10,12 +10,13 @@ async def get_top_article_title_list(pool) -> List[Dict]:
     return await pool.async_fetch(query=query, params=("TOP100",))
 
 
-async def get_hot_titles(pool, date_string) -> List[Dict]:
+async def get_hot_titles(pool, date_string) -> List[str]:
     """get titles of hot articles"""
     query = """
         select distinct title
         from datastat_sort_strategy
         where position < %s and read_rate >= %s and date_str >= %s;
     """
-    return await pool.async_fetch(query=query, params=(3, 1.21, date_string))
+    response = await pool.async_fetch(query=query, params=(3, 1.21, date_string))
+    return [i['title'] for i in response]