Forráskód Böngészése

新增抓公众号粉丝

luojunhui 2 hónapja
szülő
commit
aa5745804e
1 módosított fájl, 158 hozzáadás és 30 törlés
  1. 158 30
      applications/tasks/crawler_tasks/crawler_gzh_fans.py

+ 158 - 30
applications/tasks/crawler_tasks/crawler_gzh_fans.py

@@ -1,4 +1,4 @@
-import json
+import asyncio
 
 from applications.crawler.wechat import (
     get_gzh_fans,
@@ -9,7 +9,21 @@ from applications.api import feishu_robot
 from applications.utils import run_tasks_with_asyncio_task_group
 
 
-class CrawlerGzhFansBase:
+class CrawlerGzhFansConst:
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    FINISHED_STATUS = 2
+    FAILED_STATUS = 99
+
+    AVAILABLE_STATUS = 1
+    INVALID_STATUS = 0
+
+    MAX_SIZE = 100
+
+    MAX_CONCURRENCY = 5
+
+
+class CrawlerGzhFansBase(CrawlerGzhFansConst):
     def __init__(self, pool, log_client):
         self.pool = pool
         self.log_client = log_client
@@ -19,36 +33,44 @@ class CrawlerGzhFansBase:
         query = """
             SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
         """
-        return await self.pool.async_fetch(query=query, params=(gh_id, 1))
+        return await self.pool.async_fetch(
+            query=query, params=(gh_id, self.AVAILABLE_STATUS)
+        )
 
     # 从数据库获取粉丝 && token
     async def get_cookie_token_from_database(self, gh_id):
         query = """
             SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
         """
-        return await self.pool.async_fetch(query=query, params=(gh_id, 1))
+        return await self.pool.async_fetch(
+            query=query, params=(gh_id, self.AVAILABLE_STATUS)
+        )
 
     # 设置access_token状态为无效
     async def set_access_token_as_invalid(self, gh_id):
         query = """
             UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
         """
-        return await self.pool.async_save(query=query, params=(0, gh_id))
+        return await self.pool.async_save(
+            query=query, params=(self.INVALID_STATUS, gh_id)
+        )
 
     # 设置 cookie 状态为无效
     async def set_cookie_token_as_invalid(self, gh_id):
         query = """
             UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
         """
-        return await self.pool.async_save(query=query, params=(0, gh_id))
+        return await self.pool.async_save(
+            query=query, params=(self.INVALID_STATUS, gh_id)
+        )
 
     # 获取账号列表
     async def get_account_list_from_database(self):
         query = """
             SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp 
-            FROM gzh_account_info WHERE status = %s; 
+            FROM gzh_account_info WHERE status = %s and gh_id != 'gh_77f36c109fb1'; 
         """
-        return await self.pool.async_fetch(query=query, params=(1,))
+        return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
 
     # 获取 open_id 列表
     async def get_open_id_list_from_database(self, gh_id):
@@ -56,7 +78,9 @@ class CrawlerGzhFansBase:
             SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
             WHERE status = %s and gh_id = %s LIMIT %s;
         """
-        return await self.pool.async_fetch(query=query, params=(0, gh_id, 20))
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE)
+        )
 
     # 批量插入粉丝信息
     async def insert_gzh_fans_batch(self, account_info, user_list):
@@ -79,6 +103,67 @@ class CrawlerGzhFansBase:
             )
             await self.pool.async_save(query=query, params=params)
 
+    # 更新抓取union_id任务的状态码
+    async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
+        query = """
+            UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, gh_id, user_openid, ori_status)
+        )
+
+    # 更新 union_id
+    async def save_single_union_user(self, gh_id, user_info, semaphore):
+        async with semaphore:
+            openid = user_info.get("openid")
+            if not openid:
+                return 0
+            try:
+                locked = await self.update_task_status(
+                    gh_id=gh_id,
+                    user_openid=openid,
+                    ori_status=self.INIT_STATUS,
+                    new_status=self.PROCESSING_STATUS,
+                )
+                if not locked:
+                    return 0
+
+                update_sql = """
+                    UPDATE gzh_fans_info
+                    SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s
+                    WHERE
+                        gh_id = %s AND user_openid = %s AND status = %s;
+                """
+                affected_rows = await self.pool.async_save(
+                    query=update_sql,
+                    params=(
+                        user_info.get("unionid"),
+                        user_info.get("sex"),
+                        user_info.get("city"),
+                        user_info.get("province"),
+                        user_info.get("subscribe_scene"),
+                        self.FINISHED_STATUS,
+                        gh_id,
+                        openid,
+                        self.PROCESSING_STATUS,
+                    ),
+                )
+
+                return affected_rows or 0
+
+            except Exception:
+                # ❗防止任务卡死,可选:回滚状态
+                try:
+                    await self.update_task_status(
+                        gh_id=gh_id,
+                        user_openid=openid,
+                        ori_status=self.PROCESSING_STATUS,
+                        new_status=self.INIT_STATUS,
+                    )
+                except Exception:
+                    pass
+                return 0
+
     # 更新公众号的 cursor 位置
     async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
         query = """
@@ -94,13 +179,17 @@ class CrawlerGzhFansBase:
             UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
             WHERE gh_id = %s;
         """
-        return await self.pool.async_save(query=query, params=(cookie, token, 1, gh_id))
+        return await self.pool.async_save(
+            query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
+        )
 
     async def set_access_token_for_each_account(self, gh_id, access_token):
         query = """
             UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
         """
-        return await self.pool.async_save(query=query, params=(access_token, 1, gh_id))
+        return await self.pool.async_save(
+            query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
+        )
 
 
 class CrawlerGzhFans(CrawlerGzhFansBase):
@@ -174,29 +263,68 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
 
     # 通过 access_token && open_id 抓取 union_id
     async def get_union_ids_for_each_account(self, account_info: dict):
-        access_token = await self.get_access_token_from_database(account_info["gh_id"])
-        if not access_token:
+        access_token_info = await self.get_access_token_from_database(
+            account_info["gh_id"]
+        )
+        if not access_token_info:
             print(f"{account_info['account_name']}: access_token is not available")
             response = await get_access_token(
                 account_info["app_id"], account_info["app_secret"]
             )
-            print(json.dumps(response, indent=4, ensure_ascii=False))
-        #     access_token = response.get("access_token")
-        #     await self.set_access_token_for_each_account(account_info["gh_id"], access_token)
-        #
-        # # 通过 access_token 获取 union_id
-        # user_list = await self.get_open_id_list_from_database(gh_id=account_info["gh_id"])
-        # union_info = await get_union_id_batch(access_token=access_token ,user_list=user_list)
-        # print(json.dumps(union_info, indent=4, ensure_ascii=False))
+            access_token = response.get("access_token")
+            await self.set_access_token_for_each_account(
+                account_info["gh_id"], access_token
+            )
+            return
+
+        # 通过 access_token 获取 union_id
+        user_list = await self.get_open_id_list_from_database(
+            gh_id=account_info["gh_id"]
+        )
+        access_token = access_token_info[0]["access_token"]
+        union_info = await get_union_id_batch(
+            access_token=access_token, user_list=user_list
+        )
+        if union_info.get("errcode"):
+            await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
+
+            return
+
+        # 将查询到的 union_id存储到数据库中
+        user_info_list = union_info.get("user_info_list") or []
+        if not user_info_list:
+            return
+
+        semaphore = asyncio.Semaphore(10)
+        tasks = [
+            self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
+            for user_info in user_info_list
+        ]
+        await asyncio.gather(*tasks, return_exceptions=True)
 
     # main function
-    async def deal(self):
+    async def deal(self, task_name):
         account_list = await self.get_account_list_from_database()
-        return await run_tasks_with_asyncio_task_group(
-            task_list=account_list,
-            handler=self.crawl_history_fans_for_each_account,
-            max_concurrency=5,
-            fail_fast=False,
-            description="抓取公众号账号粉丝",
-            unit="page",
-        )
+
+        match task_name:
+            case "get_fans":
+                return await run_tasks_with_asyncio_task_group(
+                    task_list=account_list,
+                    handler=self.crawl_history_fans_for_each_account,
+                    max_concurrency=self.MAX_CONCURRENCY,
+                    fail_fast=False,
+                    description="抓取公众号账号粉丝",
+                    unit="page",
+                )
+            case "get_union_ids":
+                return await run_tasks_with_asyncio_task_group(
+                    task_list=account_list,
+                    handler=self.get_union_ids_for_each_account,
+                    max_concurrency=self.MAX_CONCURRENCY,
+                    fail_fast=False,
+                    description="获取粉丝 union_id",
+                    unit="per100",
+                )
+
+            case _:
+                return {"err_msg": "invalid task_name"}