Переглянути джерело

Merge branch 'feature/luojunhui/20260105-get-gzh-fans-detail' of Server/LongArticleTaskServer into master

luojunhui 2 місяців тому
батько
коміт
d76f027a8b

+ 5 - 0
applications/api/async_feishu_api.py

@@ -21,6 +21,9 @@ class Feishu:
     # 长文任务报警群
     long_articles_task_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/223b3d72-f2e8-40e0-9b53-6956e0ae7158"
 
+    # cookie 监测机器人
+    cookie_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/51b9c83a-f50d-44dd-939f-bcd10ac6c55a"
+
     def __init__(self):
         self.token = None
         self.headers = {"Content-Type": "application/json"}
@@ -210,6 +213,8 @@ class FeishuBotApi(Feishu):
                 url = self.server_account_publish_monitor_bot
             case "long_articles_task":
                 url = self.long_articles_task_bot
+            case "cookie_monitor":
+                url = self.cookie_monitor_bot
             case _:
                 url = self.long_articles_bot_dev
 

+ 12 - 0
applications/config/mysql_config.py

@@ -45,3 +45,15 @@ piaoquan_crawler_db_config = {
     "minsize": 5,
     "maxsize": 20,
 }
+
+# growth 数据库配置
+growth_db_config = {
+    "host": "rm-bp17q95335a99272b.mysql.rds.aliyuncs.com",
+    "port": 3306,
+    "user": "crawler",
+    "password": "crawler123456@",
+    "db": "growth",
+    "charset": "utf8mb4",
+    "minsize": 5,
+    "maxsize": 20,
+}

+ 1 - 0
applications/crawler/wechat/__init__.py

@@ -1 +1,2 @@
 from .gzh_spider import *
+from .gzh_fans import *

+ 66 - 0
applications/crawler/wechat/gzh_fans.py

@@ -0,0 +1,66 @@
+import random
+
+from applications.utils import AsyncHttpClient
+
+
+# 抓取公众号粉丝
+async def get_gzh_fans(token, cookie, cursor_id, cursor_timestamp):
+    url = "https://mp.weixin.qq.com/cgi-bin/user_tag"
+    params = {
+        "action": "get_user_list",
+        "groupid": "-2",
+        "begin_openid": cursor_id,
+        "begin_create_time": cursor_timestamp,
+        "limit": "20",
+        "offset": "0",
+        "backfoward": "1",
+        "token": token,
+        "lang": "zh_CN",
+        "f": "json",
+        "ajax": "1",
+        "fingerprint": "42694935a543ed714f89d4b05ef3a4e6",
+        "random": random.random(),
+    }
+    headers = {
+        "accept": "application/json, text/javascript, */*; q=0.01",
+        "accept-language": "zh-CN,zh;q=0.9",
+        "cache-control": "no-cache",
+        "pragma": "no-cache",
+        "priority": "u=1, i",
+        "referer": f"https://mp.weixin.qq.com/cgi-bin/user_tag?action=get_all_data&lang=zh_CN&token={token}",
+        "sec-ch-ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
+        "sec-ch-ua-mobile": "?0",
+        "sec-ch-ua-platform": '"macOS"',
+        "sec-fetch-dest": "empty",
+        "sec-fetch-mode": "cors",
+        "sec-fetch-site": "same-origin",
+        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
+        "x-requested-with": "XMLHttpRequest",
+        "Cookie": cookie,
+    }
+    # 发送请求
+    async with AsyncHttpClient(timeout=10) as http_client:
+        response = await http_client.get(url, headers=headers, params=params)
+
+    return response
+
+
+# 获取 access_token
+async def get_access_token(app_id, app_secret):
+    url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={app_id}&secret={app_secret}"
+    async with AsyncHttpClient(timeout=100) as http_client:
+        response = await http_client.get(url)
+
+    return response
+
+
+# 批量获取 union_id
+async def get_union_id_batch(access_token, user_list):
+    url = f"https://api.weixin.qq.com/cgi-bin/user/info/batchget?access_token={access_token}"
+    payload = {
+        "user_list": user_list,
+    }
+    async with AsyncHttpClient(timeout=100) as http_client:
+        response = await http_client.post(url, json=payload)
+
+    return response

+ 348 - 0
applications/tasks/crawler_tasks/crawler_gzh_fans.py

@@ -0,0 +1,348 @@
+import asyncio
+
+from applications.crawler.wechat import (
+    get_gzh_fans,
+    get_access_token,
+    get_union_id_batch,
+)
+from applications.api import feishu_robot
+from applications.utils import run_tasks_with_asyncio_task_group
+
+
+class CrawlerGzhFansConst:
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    FINISHED_STATUS = 2
+    FAILED_STATUS = 99
+
+    AVAILABLE_STATUS = 1
+    INVALID_STATUS = 0
+
+    MAX_SIZE = 100
+
+    MAX_CONCURRENCY = 5
+
+
+class CrawlerGzhFansBase(CrawlerGzhFansConst):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    # 从数据库获取 access_token
+    async def get_access_token_from_database(self, gh_id):
+        query = """
+            SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(gh_id, self.AVAILABLE_STATUS)
+        )
+
+    # 从数据库获取粉丝 && token
+    async def get_cookie_token_from_database(self, gh_id):
+        query = """
+            SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(gh_id, self.AVAILABLE_STATUS)
+        )
+
+    # 设置access_token状态为无效
+    async def set_access_token_as_invalid(self, gh_id):
+        query = """
+            UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(self.INVALID_STATUS, gh_id)
+        )
+
+    # 设置 cookie 状态为无效
+    async def set_cookie_token_as_invalid(self, gh_id):
+        query = """
+            UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(self.INVALID_STATUS, gh_id)
+        )
+
+    # 修改抓取账号状态
+    async def update_account_crawl_history_status(self, gh_id, status):
+        query = """
+            UPDATE gzh_account_info SET crawl_history_status = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(query=query, params=(status, gh_id))
+
+    # 获取账号列表
+    async def get_account_list_from_database(self):
+        query = """
+            SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp, crawl_history_status
+            FROM gzh_account_info WHERE status = %s; 
+        """
+        return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
+
+    # 获取 open_id 列表
+    async def get_open_id_list_from_database(self, gh_id):
+        query = """
+            SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
+            WHERE status = %s and gh_id = %s LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE)
+        )
+
+    # 批量插入粉丝信息
+    async def insert_gzh_fans_batch(self, account_info, user_list):
+        for user in user_list:
+            query = """
+                INSERT IGNORE INTO gzh_fans_info
+                    (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
+                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+            """
+            params = (
+                account_info["gh_id"],
+                account_info["account_name"],
+                user["user_openid"],
+                user["user_name"],
+                user["user_create_time"],
+                user["user_head_img"],
+                user["user_remark"],
+                user["identity_type"],
+                user["identity_open_id"],
+            )
+            await self.pool.async_save(query=query, params=params)
+
+    # 更新抓取union_id任务的状态码
+    async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
+        query = """
+            UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, gh_id, user_openid, ori_status)
+        )
+
+    # 更新 union_id
+    async def save_single_union_user(self, gh_id, user_info, semaphore):
+        async with semaphore:
+            openid = user_info.get("openid")
+            if not openid:
+                return 0
+            try:
+                locked = await self.update_task_status(
+                    gh_id=gh_id,
+                    user_openid=openid,
+                    ori_status=self.INIT_STATUS,
+                    new_status=self.PROCESSING_STATUS,
+                )
+                if not locked:
+                    return 0
+
+                update_sql = """
+                    UPDATE gzh_fans_info
+                    SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s
+                    WHERE
+                        gh_id = %s AND user_openid = %s AND status = %s;
+                """
+                affected_rows = await self.pool.async_save(
+                    query=update_sql,
+                    params=(
+                        user_info.get("unionid"),
+                        user_info.get("sex"),
+                        user_info.get("city"),
+                        user_info.get("province"),
+                        user_info.get("subscribe_scene"),
+                        self.FINISHED_STATUS,
+                        gh_id,
+                        openid,
+                        self.PROCESSING_STATUS,
+                    ),
+                )
+
+                return affected_rows or 0
+
+            except Exception:
+                # ❗防止任务卡死,可选:回滚状态
+                try:
+                    await self.update_task_status(
+                        gh_id=gh_id,
+                        user_openid=openid,
+                        ori_status=self.PROCESSING_STATUS,
+                        new_status=self.INIT_STATUS,
+                    )
+                except Exception:
+                    pass
+                return 0
+
+    # 更新公众号的 cursor 位置
+    async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
+        query = """
+            UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(cursor_id, cursor_timestamp, gh_id)
+        )
+
+    # 更新公众号的 cookie
+    async def set_cookie_token_for_each_account(self, gh_id, cookie, token):
+        query = """
+            UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
+            WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
+        )
+
+    async def set_access_token_for_each_account(self, gh_id, access_token):
+        query = """
+            UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
+        )
+
+
+class CrawlerGzhFans(CrawlerGzhFansBase):
+    def __init__(self, pool, log_client):
+        super().__init__(pool, log_client)
+
+    # 抓取单个账号存量的粉丝
+    async def crawl_history_fans_for_each_account(self, account_info):
+        cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
+        if not cookie_obj:
+            return
+
+        if not account_info.get("cursor_openid"):
+            cursor_openid = ""
+        else:
+            cursor_openid = account_info["cursor_openid"]
+
+        if not account_info.get("cursor_timestamp"):
+            cursor_timestamp = ""
+        else:
+            cursor_timestamp = account_info["cursor_timestamp"]
+
+        response = await get_gzh_fans(
+            token=cookie_obj[0]["token"],
+            cookie=cookie_obj[0]["cookie"],
+            cursor_id=cursor_openid,
+            cursor_timestamp=cursor_timestamp,
+        )
+        base_resp = response.get("base_resp", {})
+        code = base_resp.get("ret")
+        error_msg = base_resp.get("err_msg")
+
+        match code:
+            case 0:
+                user_list = response.get("user_list", {}).get("user_info_list")
+                if not user_list:
+                    await feishu_robot.bot(
+                        title=f"{account_info['account_name']}的粉丝已经抓取完毕,请检查",
+                        detail=account_info,
+                        env="cookie_monitor",
+                        mention=False,
+                    )
+                    await self.update_account_crawl_history_status(account_info["gh_id"], self.INVALID_STATUS)
+
+                next_cursor_id = user_list[-1].get("user_openid")
+                next_cursor_timestamp = user_list[-1].get("user_create_time")
+                await self.insert_gzh_fans_batch(account_info, user_list)
+                await self.update_gzh_cursor_info(
+                    account_info["gh_id"], next_cursor_id, next_cursor_timestamp
+                )
+
+            case "00040":
+                print(f"token 非法: {error_msg}")
+                await self.set_cookie_token_as_invalid(account_info["gh_id"])
+                await feishu_robot.bot(
+                    title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
+                    detail=account_info,
+                    env="cookie_monitor",
+                    mention=False,
+                )
+
+            case _:
+                print("token 异常, 请及时刷新")
+                await self.set_cookie_token_as_invalid(account_info["gh_id"])
+                await feishu_robot.bot(
+                    title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
+                    detail=account_info,
+                    env="cookie_monitor",
+                    mention=False,
+                )
+
+    # 通过 access_token && open_id 抓取 union_id
+    async def get_union_ids_for_each_account(self, account_info: dict):
+        # 通过 access_token 获取 union_id
+        user_list = await self.get_open_id_list_from_database(
+            gh_id=account_info["gh_id"]
+        )
+        if not user_list:
+            return
+
+        access_token_info = await self.get_access_token_from_database(
+            account_info["gh_id"]
+        )
+        if not access_token_info:
+            print(f"{account_info['account_name']}: access_token is not available")
+            response = await get_access_token(
+                account_info["app_id"], account_info["app_secret"]
+            )
+            access_token = response.get("access_token")
+            await self.set_access_token_for_each_account(
+                account_info["gh_id"], access_token
+            )
+            return
+
+        access_token = access_token_info[0]["access_token"]
+        union_info = await get_union_id_batch(
+            access_token=access_token, user_list=user_list
+        )
+        if union_info.get("errcode"):
+            await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
+
+            return
+
+        # 将查询到的 union_id存储到数据库中
+        user_info_list = union_info.get("user_info_list") or []
+        if not user_info_list:
+            return
+
+        semaphore = asyncio.Semaphore(10)
+        tasks = [
+            self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
+            for user_info in user_info_list
+        ]
+        await asyncio.gather(*tasks, return_exceptions=True)
+
+    # main function
+    async def deal(self, task_name):
+        account_list = await self.get_account_list_from_database()
+
+        match task_name:
+            case "get_history_fans":
+                crawl_history_accounts = [i for i in account_list if i['crawl_history_status'] == self.AVAILABLE_STATUS]
+                return await run_tasks_with_asyncio_task_group(
+                    task_list=crawl_history_accounts,
+                    handler=self.crawl_history_fans_for_each_account,
+                    max_concurrency=self.MAX_CONCURRENCY,
+                    fail_fast=False,
+                    description="抓取公众号账号粉丝",
+                    unit="page",
+                )
+
+            case "get_union_ids":
+                return await run_tasks_with_asyncio_task_group(
+                    task_list=account_list,
+                    handler=self.get_union_ids_for_each_account,
+                    max_concurrency=self.MAX_CONCURRENCY,
+                    fail_fast=False,
+                    description="获取粉丝 union_id",
+                    unit="per100",
+                )
+
+            case "get_new_fans":
+                for account in account_list:
+                    print(account)
+                # return await run_tasks_with_asyncio_task_group()
+                return 0
+
+            case _:
+                return {"err_msg": "invalid task_name"}