Переглянути джерело

优化 access_token获取逻辑

luojunhui 1 місяць тому
батько
коміт
b02ee58f9a
1 змінених файлів з 33 додано та 16 видалено
  1. 33 16
      applications/tasks/crawler_tasks/crawler_gzh_fans.py

+ 33 - 16
applications/tasks/crawler_tasks/crawler_gzh_fans.py

@@ -1,5 +1,6 @@
 import asyncio
 import json
+import time
 from datetime import datetime
 
 from applications.crawler.wechat import (
@@ -24,6 +25,8 @@ class CrawlerGzhFansConst:
 
     MAX_CONCURRENCY = 5
 
+    GAP_DURATION = 300
+
 
 class CrawlerGzhFansBase(CrawlerGzhFansConst):
     def __init__(self, pool, log_client):
@@ -33,11 +36,9 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
     # 从数据库获取 access_token
     async def get_access_token_from_database(self, gh_id):
         query = """
-            SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
+            SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
         """
-        return await self.pool.async_fetch(
-            query=query, params=(gh_id, self.AVAILABLE_STATUS)
-        )
+        return await self.pool.async_fetch(query=query, params=(gh_id, ))
 
     # 从数据库获取粉丝 && token
     async def get_cookie_token_from_database(self, gh_id):
@@ -208,12 +209,15 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
             query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
         )
 
-    async def set_access_token_for_each_account(self, gh_id, access_token):
+    async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp):
         query = """
-            UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
+            UPDATE gzh_cookie_info 
+            SET access_token = %s, access_token_status = %s, expire_timestamp = %s 
+            WHERE gh_id = %s;
         """
         return await self.pool.async_save(
-            query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
+            query=query,
+            params=(access_token, self.AVAILABLE_STATUS, expire_timestamp, gh_id)
         )
 
     async def get_max_cursor_id(self, gh_id):
@@ -395,24 +399,37 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
         access_token_info = await self.get_access_token_from_database(
             account_info["gh_id"]
         )
+
         if not access_token_info:
-            print(f"{account_info['account_name']}: access_token is not available")
-            response = await get_access_token(
-                account_info["app_id"], account_info["app_secret"]
-            )
-            access_token = response.get("access_token")
+            return
+
+        # 更新 token
+        async def update_token(_new_token_info):
+            _access_token = _new_token_info["access_token"]
+            _expires_in = _new_token_info["expires_in"]
+
             await self.set_access_token_for_each_account(
-                account_info["gh_id"], access_token
+                gh_id=account_info["gh_id"],
+                access_token=_access_token,
+                expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
             )
-            return
+            print(f"{account_info['account_name']} access_token updated to database")
+
+        expire_timestamp = access_token_info[0]["expire_timestamp"] or 0
+        if int(time.time()) >= expire_timestamp:
+            new_token_info = await get_access_token(
+                account_info["app_id"], account_info["app_secret"]
+            )
+            access_token = new_token_info["access_token"]
+            await update_token(_new_token_info=new_token_info)
+        else:
+            access_token = access_token_info[0]["access_token"]
 
-        access_token = access_token_info[0]["access_token"]
         union_info = await get_union_id_batch(
             access_token=access_token, user_list=user_list
         )
         if union_info.get("errcode"):
             await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
-
             return
 
         # 将查询到的 union_id存储到数据库中