import asyncio import json import time from datetime import datetime from app.infra.crawler.wechat import ( get_gzh_fans, get_access_token, get_union_id_batch, ) from app.infra.external import feishu_robot from app.infra.shared import run_tasks_with_asyncio_task_group class CrawlerGzhFansConst: INIT_STATUS = 0 PROCESSING_STATUS = 1 FINISHED_STATUS = 2 FAILED_STATUS = 99 AVAILABLE_STATUS = 1 INVALID_STATUS = 0 MAX_SIZE = 100 MAX_CONCURRENCY = 5 GAP_DURATION = 300 class CrawlerGzhFansBase(CrawlerGzhFansConst): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client # 从数据库获取 access_token async def get_access_token_from_database(self, gh_id): query = """ SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s; """ return await self.pool.async_fetch(query=query, params=(gh_id,)) # 从数据库获取粉丝 && token async def get_cookie_token_from_database(self, gh_id): query = """ SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s; """ return await self.pool.async_fetch( query=query, params=(gh_id, self.AVAILABLE_STATUS) ) # 设置access_token状态为无效 async def set_access_token_as_invalid(self, gh_id): query = """ UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(self.INVALID_STATUS, gh_id) ) # 设置 cookie 状态为无效 async def set_cookie_token_as_invalid(self, gh_id): query = """ UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(self.INVALID_STATUS, gh_id) ) # 修改抓取账号状态 async def update_account_crawl_history_status(self, gh_id, status): query = """ UPDATE gzh_account_info SET crawl_history_status = %s WHERE gh_id = %s; """ return await self.pool.async_save(query=query, params=(status, gh_id)) # 获取账号列表 async def get_account_list_from_database(self): query = """ SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp, crawl_history_status, consist_crawl_status, binding_status FROM gzh_account_info WHERE status = %s; """ return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,)) # 获取 open_id 列表 async def get_open_id_list_from_database(self, gh_id): query = """ SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info WHERE status = %s and gh_id = %s LIMIT %s; """ return await self.pool.async_fetch( query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE) ) # 批量插入粉丝信息 async def insert_gzh_fans_batch(self, account_info, user_list): param_list = [ ( account_info["gh_id"], account_info["account_name"], user["user_openid"], user["user_name"], user["user_create_time"], user["user_head_img"], user["user_remark"], user["identity_type"], user["identity_open_id"], ) for user in user_list ] query = """ INSERT IGNORE INTO gzh_fans_info (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ await self.pool.async_save(query=query, params=param_list, batch=True) # 插入失败详情 async def insert_fail_detail(self, gh_id, fail_info): query = """ UPDATE gzh_account_info SET fail_detail = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(gh_id, json.dumps(fail_info, ensure_ascii=False)) ) # 获取失败详情 async def fetch_fail_index(self, gh_id): query = """ SELECT fail_detail FROM gzh_account_info WHERE gh_id = %s; """ return await self.pool.async_fetch(query=query, params=(gh_id,)) # 更新抓取union_id任务的状态码 async def update_task_status(self, gh_id, user_openid, ori_status, new_status): query = """ UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s; """ return await self.pool.async_save( query=query, params=(new_status, gh_id, user_openid, ori_status) ) # 更新 union_id async def save_single_union_user(self, gh_id, user_info, semaphore): async with semaphore: openid = user_info.get("openid") if not openid: return 0 try: locked = await self.update_task_status( gh_id=gh_id, user_openid=openid, ori_status=self.INIT_STATUS, new_status=self.PROCESSING_STATUS, ) if not locked: return 0 update_sql = """ UPDATE gzh_fans_info SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s WHERE gh_id = %s AND user_openid = %s AND status = %s; """ affected_rows = await self.pool.async_save( query=update_sql, params=( user_info.get("unionid"), user_info.get("sex"), user_info.get("city"), user_info.get("province"), user_info.get("subscribe_scene"), self.FINISHED_STATUS, gh_id, openid, self.PROCESSING_STATUS, ), ) return affected_rows or 0 except Exception: # ❗防止任务卡死,可选:回滚状态 try: await self.update_task_status( gh_id=gh_id, user_openid=openid, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, ) except Exception: pass return 0 # 更新公众号的 cursor 位置 async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp): query = """ UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(cursor_id, cursor_timestamp, gh_id) ) # 更新公众号的 cookie async def set_cookie_token_for_each_account(self, gh_id, cookie, token): query = """ UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id) ) async def set_access_token_for_each_account( self, gh_id, access_token, expire_timestamp ): query = """ UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s, expire_timestamp = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(access_token, self.AVAILABLE_STATUS, expire_timestamp, gh_id), ) async def get_max_cursor_id(self, gh_id): query = """ SELECT user_openid, user_create_time FROM gzh_fans_info WHERE gh_id = %s ORDER BY user_create_time DESC LIMIT 1; """ return await self.pool.async_fetch(query=query, params=(gh_id,)) class CrawlerGzhFans(CrawlerGzhFansBase): def __init__(self, pool, log_client): super().__init__(pool, log_client) # 抓取账号新增粉丝 async def crawl_new_fans_for_each_account(self, account_info: dict): cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"]) if not cookie_obj: return # 获取失败详情 fetch_response = await self.fetch_fail_index(account_info["gh_id"]) fail_detail = json.loads(fetch_response[0]["fail_detail"] or "{}") cursor_openid = fail_detail.get("cursor_openid") or "" cursor_timestamp = fail_detail.get("cursor_timestamp") or "" newest_timestamp = fail_detail.get("newest_timestamp") or None if not newest_timestamp: newest_fans = await self.get_max_cursor_id(account_info["gh_id"]) newest_timestamp = newest_fans[0]["user_create_time"] while True: try: response = await get_gzh_fans( token=cookie_obj[0]["token"], cookie=cookie_obj[0]["cookie"], cursor_id=cursor_openid, cursor_timestamp=cursor_timestamp, ) base_resp = response.get("base_resp", {}) code = base_resp.get("ret") error_msg = base_resp.get("err_msg") match code: case 0: user_list = response.get("user_list", {}).get("user_info_list") next_cursor_id = user_list[-1].get("user_openid") next_cursor_timestamp = user_list[-1].get("user_create_time") await self.insert_gzh_fans_batch(account_info, user_list) if next_cursor_timestamp <= newest_timestamp: await feishu_robot.bot( title=f"{account_info['account_name']}本月新增粉丝抓取完毕", detail=account_info, env="cookie_monitor", mention=False, ) break else: cursor_openid = next_cursor_id cursor_timestamp = next_cursor_timestamp print( datetime.fromtimestamp(cursor_timestamp).strftime( "%Y-%m-%d %H:%M:%S" ) ) case "00040": print(f"token 非法: {error_msg}") await self.set_cookie_token_as_invalid(account_info["gh_id"]) await feishu_robot.bot( title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新", detail=account_info, env="cookie_monitor", mention=False, ) break case _: print("token 异常, 请及时刷新") await self.set_cookie_token_as_invalid(account_info["gh_id"]) await feishu_robot.bot( title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新", detail=account_info, env="cookie_monitor", mention=False, ) break except Exception as e: fail_obj = { "cursor_openid": cursor_openid, "cursor_timestamp": cursor_timestamp, "newest_timestamp": newest_timestamp, "fail_reason": str(e), } await self.insert_fail_detail(account_info["gh_id"], fail_obj) await feishu_robot.bot( title=f"{account_info['account_name']}本月新增粉丝抓取异常,请查看", detail=fail_obj, env="cookie_monitor", mention=False, ) break # 抓取单个账号存量的粉丝 async def crawl_history_fans_for_each_account(self, account_info: dict): cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"]) if not cookie_obj: return if not account_info.get("cursor_openid"): cursor_openid = "" else: cursor_openid = account_info["cursor_openid"] if not account_info.get("cursor_timestamp"): cursor_timestamp = "" else: cursor_timestamp = account_info["cursor_timestamp"] response = await get_gzh_fans( token=cookie_obj[0]["token"], cookie=cookie_obj[0]["cookie"], cursor_id=cursor_openid, cursor_timestamp=cursor_timestamp, ) base_resp = response.get("base_resp", {}) code = base_resp.get("ret") error_msg = base_resp.get("err_msg") match code: case 0: user_list = response.get("user_list", {}).get("user_info_list") if not user_list: await feishu_robot.bot( title=f"{account_info['account_name']}的粉丝已经抓取完毕,请检查", detail=account_info, env="cookie_monitor", mention=False, ) await self.update_account_crawl_history_status( account_info["gh_id"], self.INVALID_STATUS ) next_cursor_id = user_list[-1].get("user_openid") next_cursor_timestamp = user_list[-1].get("user_create_time") await self.insert_gzh_fans_batch(account_info, user_list) await self.update_gzh_cursor_info( account_info["gh_id"], next_cursor_id, next_cursor_timestamp ) case "00040": print(f"token 非法: {error_msg}") await self.set_cookie_token_as_invalid(account_info["gh_id"]) await feishu_robot.bot( title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新", detail=account_info, env="cookie_monitor", mention=False, ) case _: print("token 异常, 请及时刷新") await self.set_cookie_token_as_invalid(account_info["gh_id"]) await feishu_robot.bot( title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新", detail=account_info, env="cookie_monitor", mention=False, ) # 通过 access_token && open_id 抓取 union_id async def get_union_ids_for_each_account(self, account_info: dict): # 通过 access_token 获取 union_id user_list = await self.get_open_id_list_from_database( gh_id=account_info["gh_id"] ) if not user_list: return access_token_info = await self.get_access_token_from_database( account_info["gh_id"] ) if not access_token_info: return # 更新 token async def update_token(_new_token_info): _access_token = _new_token_info["access_token"] _expires_in = _new_token_info["expires_in"] await self.set_access_token_for_each_account( gh_id=account_info["gh_id"], access_token=_access_token, expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION, ) print(f"{account_info['account_name']} access_token updated to database") expire_timestamp = access_token_info[0]["expire_timestamp"] or 0 if int(time.time()) >= expire_timestamp: new_token_info = await get_access_token( account_info["app_id"], account_info["app_secret"] ) access_token = new_token_info["access_token"] await update_token(_new_token_info=new_token_info) else: access_token = access_token_info[0]["access_token"] union_info = await get_union_id_batch( access_token=access_token, user_list=user_list ) if union_info.get("errcode"): await self.set_access_token_as_invalid(gh_id=account_info["gh_id"]) return # 将查询到的 union_id存储到数据库中 user_info_list = union_info.get("user_info_list") or [] if not user_info_list: return semaphore = asyncio.Semaphore(10) tasks = [ self.save_single_union_user(account_info["gh_id"], user_info, semaphore) for user_info in user_info_list ] await asyncio.gather(*tasks, return_exceptions=True) # main function async def deal(self, task_name): account_list = await self.get_account_list_from_database() match task_name: case "get_history_fans": crawl_history_accounts = [ i for i in account_list if i["crawl_history_status"] == self.AVAILABLE_STATUS ] return await run_tasks_with_asyncio_task_group( task_list=crawl_history_accounts, handler=self.crawl_history_fans_for_each_account, max_concurrency=self.MAX_CONCURRENCY, fail_fast=False, description="抓取公众号账号粉丝", unit="page", ) case "get_union_ids": binding_accounts = [ i for i in account_list if i["binding_status"] == self.AVAILABLE_STATUS ] return await run_tasks_with_asyncio_task_group( task_list=binding_accounts, handler=self.get_union_ids_for_each_account, max_concurrency=self.MAX_CONCURRENCY, fail_fast=False, description="获取粉丝 union_id", unit="per100", ) case "get_new_fans": consist_crawl_accounts = [ i for i in account_list if i["consist_crawl_status"] == self.AVAILABLE_STATUS ] for account in consist_crawl_accounts: print( f"处理: {account['account_name']}: gh_id: {account['gh_id']}" ) await self.crawl_new_fans_for_each_account(account) # return await run_tasks_with_asyncio_task_group() return {} case _: return {"err_msg": "invalid task_name"}