浏览代码

获取详情优化

luojunhui 1 月之前
父节点
当前提交
b9b7e13c5f
共有 1 个文件被更改,包括 85 次插入47 次删除
  1. 85 47
      applications/tasks/crawler_tasks/crawler_gzh_fans.py

+ 85 - 47
applications/tasks/crawler_tasks/crawler_gzh_fans.py

@@ -1,5 +1,6 @@
 import asyncio
 import json
+from datetime import datetime
 
 from applications.crawler.wechat import (
     get_gzh_fans,
@@ -111,6 +112,22 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
             )
             await self.pool.async_save(query=query, params=params)
 
+    # 插入失败详情
+    async def insert_fail_detail(self, gh_id, fail_info):
+        query = """
+            UPDATE gzh_account_info SET fail_detail = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(gh_id, json.dumps(fail_info, ensure_ascii=False))
+        )
+
+    # 获取失败详情
+    async def fetch_fail_index(self, gh_id):
+        query = """
+            SELECT fail_detail FROM gzh_account_info WHERE gh_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(gh_id,))
+
     # 更新抓取union_id任务的状态码
     async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
         query = """
@@ -218,59 +235,77 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
         if not cookie_obj:
             return
 
-        cursor_openid = ""
-        cursor_timestamp = ""
+        # 获取失败详情
+        fetch_response = await self.fetch_fail_index(account_info["gh_id"])
+        fail_detail = json.loads(fetch_response[0]["fail_detail"] or "{}")
 
-        newest_fans = await self.get_max_cursor_id(account_info["gh_id"])
-        newest_timestamp = newest_fans[0]["user_create_time"]
+        cursor_openid = fail_detail.get("cursor_openid") or "oz7jf60KEF18NYdvGIf_e__M8dg4"
+        cursor_timestamp = fail_detail.get("cursor_timestamp") or 1768126043
+        newest_timestamp = fail_detail.get("newest_timestamp") or 1767616023
+
+        if not newest_timestamp:
+            newest_fans = await self.get_max_cursor_id(account_info["gh_id"])
+            newest_timestamp = newest_fans[0]["user_create_time"]
 
         while True:
-            response = await get_gzh_fans(
-                token=cookie_obj[0]["token"],
-                cookie=cookie_obj[0]["cookie"],
-                cursor_id=cursor_openid,
-                cursor_timestamp=cursor_timestamp,
-            )
-            print(json.dumps(response, ensure_ascii=False, indent=4))
-            base_resp = response.get("base_resp", {})
-            code = base_resp.get("ret")
-            error_msg = base_resp.get("err_msg")
-
-            match code:
-                case 0:
-                    user_list = response.get("user_list", {}).get("user_info_list")
-                    next_cursor_id = user_list[-1].get("user_openid")
-                    next_cursor_timestamp = user_list[-1].get("user_create_time")
-                    await self.insert_gzh_fans_batch(account_info, user_list)
-                    if next_cursor_timestamp <= newest_timestamp:
-                        print("新粉丝更新完成")
+            try:
+                response = await get_gzh_fans(
+                    token=cookie_obj[0]["token"],
+                    cookie=cookie_obj[0]["cookie"],
+                    cursor_id=cursor_openid,
+                    cursor_timestamp=cursor_timestamp,
+                )
+                base_resp = response.get("base_resp", {})
+                code = base_resp.get("ret")
+                error_msg = base_resp.get("err_msg")
+
+                match code:
+                    case 0:
+                        user_list = response.get("user_list", {}).get("user_info_list")
+                        next_cursor_id = user_list[-1].get("user_openid")
+                        next_cursor_timestamp = user_list[-1].get("user_create_time")
+                        await self.insert_gzh_fans_batch(account_info, user_list)
+                        if next_cursor_timestamp <= newest_timestamp:
+                            print("新粉丝更新完成")
+                            break
+
+                        else:
+                            cursor_openid = next_cursor_id
+                            cursor_timestamp = next_cursor_timestamp
+                            print(datetime.fromtimestamp(cursor_timestamp).strftime("%Y-%m-%d %H:%M:%S"))
+
+                    case "00040":
+                        print(f"token 非法: {error_msg}")
+                        await self.set_cookie_token_as_invalid(account_info["gh_id"])
+                        await feishu_robot.bot(
+                            title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
+                            detail=account_info,
+                            env="cookie_monitor",
+                            mention=False,
+                        )
                         break
 
-                    else:
-                        cursor_openid = next_cursor_id
-                        cursor_timestamp = next_cursor_timestamp
-
-                case "00040":
-                    print(f"token 非法: {error_msg}")
-                    await self.set_cookie_token_as_invalid(account_info["gh_id"])
-                    await feishu_robot.bot(
-                        title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
-                        detail=account_info,
-                        env="cookie_monitor",
-                        mention=False,
-                    )
-                    break
+                    case _:
+                        print("token 异常, 请及时刷新")
+                        await self.set_cookie_token_as_invalid(account_info["gh_id"])
+                        await feishu_robot.bot(
+                            title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
+                            detail=account_info,
+                            env="cookie_monitor",
+                            mention=False,
+                        )
+                        break
 
-                case _:
-                    print("token 异常, 请及时刷新")
-                    await self.set_cookie_token_as_invalid(account_info["gh_id"])
-                    await feishu_robot.bot(
-                        title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
-                        detail=account_info,
-                        env="cookie_monitor",
-                        mention=False,
-                    )
-                    break
+            except Exception as e:
+                fail_obj = {
+                    "cursor_openid": cursor_openid,
+                    "cursor_timestamp": cursor_timestamp,
+                    "newest_timestamp": newest_timestamp,
+                    "fail_reason": str(e),
+                }
+                await self.insert_fail_detail(account_info["gh_id"], fail_obj)
+                await feishu_robot.bot(
+                )
 
     # 抓取单个账号存量的粉丝
     async def crawl_history_fans_for_each_account(self, account_info: dict):
@@ -409,6 +444,9 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
 
             case "get_new_fans":
                 for account in account_list:
+                    if account["gh_id"] != 'gh_971e23b9ecc4':
+                        continue
+
                     print(f"处理: {account['account_name']}")
                     await self.crawl_new_fans_for_each_account(account)
                 # return await run_tasks_with_asyncio_task_group()