瀏覽代碼

新增文章详情抓取

luojunhui 1 月之前
父節點
當前提交
b019cf57ad

+ 1 - 0
applications/crawler/wechat/__init__.py

@@ -1,2 +1,3 @@
 from .gzh_spider import *
 from .gzh_fans import *
+from .gzh_article_stat import *

+ 21 - 0
applications/crawler/wechat/gzh_article_stat.py

@@ -0,0 +1,21 @@
+from applications.utils import AsyncHttpClient
+
+
+# 抓取公众号粉丝
+async def get_gzh_stat_daily(access_token: str, date_string: str):
+    url = f"https://api.weixin.qq.com/datacube/getarticletotaldetail?access_token={access_token}"
+    data = {
+        "begin_date": date_string,
+        "end_date": date_string
+    }
+    headers = {
+        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
+        "Content-Type": "application/json"
+    }
+    # 发送请求
+    async with AsyncHttpClient(timeout=10) as http_client:
+        response = await http_client.post(url, headers=headers, json=data)
+
+    return response
+
+

+ 2 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -1,3 +1,4 @@
+from .article_detail_stat import ArticleDetailStat
 from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
 from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
 from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
@@ -15,4 +16,5 @@ __all__ = [
     "RecycleMiniProgramDetailTask",
     "RecycleOutsideAccountArticlesTask",
     "UpdateOutsideRootSourceIdAndUpdateTimeTask",
+    "ArticleDetailStat",
 ]

+ 83 - 0
applications/tasks/data_recycle_tasks/article_detail_stat.py

@@ -0,0 +1,83 @@
+import json
+import time
+
+from applications.crawler.wechat import get_gzh_stat_daily
+from applications.crawler.wechat import get_access_token
+
+
+class ArticleDetailStatConst:
+
+    # Task Status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
+
+    # Account Status
+    ACCOUNT_VALID_STATUS = 1
+    ACCOUNT_INVALID_STATUS = 0
+
+    # Cookie Status
+    COOKIE_VALID_STATUS = 1
+    COOKIE_INVALID_STATUS = 0
+
+
+class ArticleDetailStatMapper(ArticleDetailStatConst):
+
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    # 获取账号信息
+    async def fetch_monitor_accounts(self):
+        query = """
+            SELECT gh_id, account_name, app_id, app_secret 
+            FROM gzh_account_info WHERE status = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(self.ACCOUNT_VALID_STATUS, ))
+
+    # 更新 access_token
+    async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp):
+        query = """
+            UPDATE gzh_cookie_info 
+            SET access_token = %s, access_token_status = %s, expire_timestamp = %s
+            WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id)
+        )
+
+    # 从数据库获取 access_token
+    async def get_access_token_from_database(self, gh_id):
+        query = """
+            SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(gh_id, ))
+
+
+class ArticleDetailStat(ArticleDetailStatMapper):
+    def __init__(self, pool, log_client):
+        super().__init__(pool, log_client)
+
+    # 处理单个账号
+    async def process_single_account(self, account: dict):
+        gh_id = account["gh_id"]
+        token_info = await self.get_access_token_from_database(gh_id)
+        if not token_info:
+            return
+
+        expire_timestamp = token_info[0]["expire_timestamp"] or 0
+        if int(time.time()) >= expire_timestamp:
+            print(f"{account['account_name']} access_token expired")
+            new_token_info = await get_access_token(account["app_id"], account["app_secret"])
+            print(json.dumps(new_token_info, indent=4))
+
+
+    # 入口函数
+    async def deal(self):
+        accounts = await self.fetch_monitor_accounts()
+        for account in accounts[:1]:
+            await self.process_single_account(account)
+
+
+