luojunhui 2 месяцев назад
Родитель
Сommit
9f43d941c3

+ 1 - 0
applications/crawler/wechat/__init__.py

@@ -1 +1,2 @@
 from .gzh_spider import *
+from .gzh_fans import *

+ 165 - 0
applications/tasks/crawler_tasks/crawler_gzh_fans.py

@@ -0,0 +1,165 @@
+import json
+
+from applications.crawler.wechat import (
+    get_gzh_fans,
+    get_access_token,
+    get_union_id_batch,
+)
+
+
+class CrawlerGzhFansBase:
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    # 从数据库获取 access_token
+    async def get_access_token_from_database(self, gh_id):
+        query = """
+            SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(gh_id, 1))
+
+    # 从数据库获取粉丝 && token
+    async def get_cookie_token_from_database(self, gh_id):
+        query = """
+            SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(gh_id, 1))
+
+    # 设置access_token状态为无效
+    async def set_access_token_as_invalid(self, gh_id):
+        query = """
+            UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(query=query, params=(0, gh_id))
+
+    # 设置 cookie 状态为无效
+    async def set_cookie_token_as_invalid(self, gh_id):
+        query = """
+            UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(query=query, params=(0, gh_id))
+
+    # 获取账号列表
+    async def get_account_list_from_database(self):
+        query = """
+            SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp 
+            FROM gzh_account_info WHERE status = %s; 
+        """
+        return await self.pool.async_fetch(query=query, params=(1,))
+
+    # 获取 open_id 列表
+    async def get_open_id_list_from_database(self, gh_id):
+        query = """
+            SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
+            WHERE status = %s and gh_id = %s LIMIT %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(0, gh_id, 20))
+
+    # 批量插入粉丝信息
+    async def insert_gzh_fans_batch(self, account_info, user_list):
+        for user in user_list:
+            query = """
+                INSERT IGNORE INTO gzh_fans_info
+                    (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
+                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+            """
+            params = (
+                account_info["gh_id"],
+                account_info["account_name"],
+                user["user_openid"],
+                user["user_name"],
+                user["user_create_time"],
+                user["user_head_img"],
+                user["user_remark"],
+                user["identity_type"],
+                user["identity_open_id"],
+            )
+            await self.pool.async_save(query=query, params=params)
+
+    # 更新公众号的 cursor 位置
+    async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
+        query = """
+            UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(cursor_id, cursor_timestamp, gh_id)
+        )
+
+    # 更新公众号的 cookie
+    async def set_cookie_for_each_account(self, gh_id, cookie, token):
+        query = """
+            UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
+            WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(query=query, params=(cookie, token, 1, gh_id))
+
+    async def set_access_token_for_each_account(self, gh_id, access_token):
+        query = """
+            UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(query=query, params=(access_token, 1, gh_id))
+
+
+class CrawlerGzhFans(CrawlerGzhFansBase):
+    def __init__(self, pool, log_client):
+        super().__init__(pool, log_client)
+
+    # 抓取单个账号的粉丝
+    async def crawl_fans_for_each_account(self, account_info):
+        cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
+        if not account_info.get("cursor_openid"):
+            cursor_openid = ''
+        else:
+            cursor_openid = account_info["cursor_openid"]
+
+        if not account_info.get("cursor_timestamp"):
+            cursor_timestamp = ''
+        else:
+            cursor_timestamp = account_info["cursor_timestamp"]
+
+        response = await get_gzh_fans(
+            token=cookie_obj[0]["token"],
+            cookie=cookie_obj[0]["cookie"],
+            cursor_id=cursor_openid,
+            cursor_timestamp=cursor_timestamp,
+        )
+        base_resp = response.get("base_resp", {})
+        print(json.dumps(base_resp, indent=4, ensure_ascii=False))
+        code = base_resp.get("ret")
+        error_msg = base_resp.get("err_msg")
+
+        match code:
+            case 0:
+                user_list = response.get("user_list", {}).get("user_info_list")
+                next_cursor_id = user_list[-1].get("user_openid")
+                next_cursor_timestamp = user_list[-1].get("user_create_time")
+                print(json.dumps(user_list, ensure_ascii=False, indent=4))
+                await self.insert_gzh_fans_batch(account_info, user_list)
+                await self.update_gzh_cursor_info(
+                    account_info["gh_id"], next_cursor_id, next_cursor_timestamp
+                )
+
+            case _:
+                print(code, error_msg)
+
+    # 通过 access_token && open_id 抓取 union_id
+    async def get_union_ids_for_each_account(self, account_info: dict):
+        access_token = await self.get_access_token_from_database(account_info["gh_id"])
+        if not access_token:
+            print(f"{account_info['account_name']}: access_token is not available")
+            response = await get_access_token(account_info['app_id'], account_info["app_secret"])
+            print(json.dumps(response, indent=4, ensure_ascii=False))
+        #     access_token = response.get("access_token")
+        #     await self.set_access_token_for_each_account(account_info["gh_id"], access_token)
+        #
+        # # 通过 access_token 获取 union_id
+        # user_list = await self.get_open_id_list_from_database(gh_id=account_info["gh_id"])
+        # union_info = await get_union_id_batch(access_token=access_token ,user_list=user_list)
+        # print(json.dumps(union_info, indent=4, ensure_ascii=False))
+
+    # main function
+    async def deal(self):
+        account_list = await self.get_account_list_from_database()
+        for account_info in account_list:
+            await self.get_union_ids_for_each_account(account_info)