| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 |
- import asyncio
- import json
- import time
- from datetime import datetime
- from app.infra.crawler.wechat import (
- get_gzh_fans,
- get_access_token,
- get_union_id_batch,
- )
- from app.infra.external import feishu_robot
- from app.infra.shared import run_tasks_with_asyncio_task_group
- class CrawlerGzhFansConst:
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- FINISHED_STATUS = 2
- FAILED_STATUS = 99
- AVAILABLE_STATUS = 1
- INVALID_STATUS = 0
- MAX_SIZE = 100
- MAX_CONCURRENCY = 5
- GAP_DURATION = 300
- class CrawlerGzhFansBase(CrawlerGzhFansConst):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- # 从数据库获取 access_token
- async def get_access_token_from_database(self, gh_id):
- query = """
- SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(gh_id,))
- # 从数据库获取粉丝 && token
- async def get_cookie_token_from_database(self, gh_id):
- query = """
- SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(gh_id, self.AVAILABLE_STATUS)
- )
- # 设置access_token状态为无效
- async def set_access_token_as_invalid(self, gh_id):
- query = """
- UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(self.INVALID_STATUS, gh_id)
- )
- # 设置 cookie 状态为无效
- async def set_cookie_token_as_invalid(self, gh_id):
- query = """
- UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(self.INVALID_STATUS, gh_id)
- )
- # 修改抓取账号状态
- async def update_account_crawl_history_status(self, gh_id, status):
- query = """
- UPDATE gzh_account_info SET crawl_history_status = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(query=query, params=(status, gh_id))
- # 获取账号列表
- async def get_account_list_from_database(self):
- query = """
- SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp,
- crawl_history_status, consist_crawl_status, binding_status
- FROM gzh_account_info WHERE status = %s;
- """
- return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
- # 获取 open_id 列表
- async def get_open_id_list_from_database(self, gh_id):
- query = """
- SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
- WHERE status = %s and gh_id = %s LIMIT %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE)
- )
- # 批量插入粉丝信息
- async def insert_gzh_fans_batch(self, account_info, user_list):
- param_list = [
- (
- account_info["gh_id"],
- account_info["account_name"],
- user["user_openid"],
- user["user_name"],
- user["user_create_time"],
- user["user_head_img"],
- user["user_remark"],
- user["identity_type"],
- user["identity_open_id"],
- )
- for user in user_list
- ]
- query = """
- INSERT IGNORE INTO gzh_fans_info
- (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- await self.pool.async_save(query=query, params=param_list, batch=True)
- # 插入失败详情
- async def insert_fail_detail(self, gh_id, fail_info):
- query = """
- UPDATE gzh_account_info SET fail_detail = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(gh_id, json.dumps(fail_info, ensure_ascii=False))
- )
- # 获取失败详情
- async def fetch_fail_index(self, gh_id):
- query = """
- SELECT fail_detail FROM gzh_account_info WHERE gh_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(gh_id,))
- # 更新抓取union_id任务的状态码
- async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
- query = """
- UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, gh_id, user_openid, ori_status)
- )
- # 更新 union_id
- async def save_single_union_user(self, gh_id, user_info, semaphore):
- async with semaphore:
- openid = user_info.get("openid")
- if not openid:
- return 0
- try:
- locked = await self.update_task_status(
- gh_id=gh_id,
- user_openid=openid,
- ori_status=self.INIT_STATUS,
- new_status=self.PROCESSING_STATUS,
- )
- if not locked:
- return 0
- update_sql = """
- UPDATE gzh_fans_info
- SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s
- WHERE
- gh_id = %s AND user_openid = %s AND status = %s;
- """
- affected_rows = await self.pool.async_save(
- query=update_sql,
- params=(
- user_info.get("unionid"),
- user_info.get("sex"),
- user_info.get("city"),
- user_info.get("province"),
- user_info.get("subscribe_scene"),
- self.FINISHED_STATUS,
- gh_id,
- openid,
- self.PROCESSING_STATUS,
- ),
- )
- return affected_rows or 0
- except Exception:
- # ❗防止任务卡死,可选:回滚状态
- try:
- await self.update_task_status(
- gh_id=gh_id,
- user_openid=openid,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- )
- except Exception:
- pass
- return 0
- # 更新公众号的 cursor 位置
- async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
- query = """
- UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(cursor_id, cursor_timestamp, gh_id)
- )
- # 更新公众号的 cookie
- async def set_cookie_token_for_each_account(self, gh_id, cookie, token):
- query = """
- UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
- WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
- )
- async def set_access_token_for_each_account(
- self, gh_id, access_token, expire_timestamp
- ):
- query = """
- UPDATE gzh_cookie_info
- SET access_token = %s, access_token_status = %s, expire_timestamp = %s
- WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(access_token, self.AVAILABLE_STATUS, expire_timestamp, gh_id),
- )
- async def get_max_cursor_id(self, gh_id):
- query = """
- SELECT user_openid, user_create_time
- FROM gzh_fans_info WHERE gh_id = %s
- ORDER BY user_create_time DESC LIMIT 1;
- """
- return await self.pool.async_fetch(query=query, params=(gh_id,))
- class CrawlerGzhFans(CrawlerGzhFansBase):
- def __init__(self, pool, log_client):
- super().__init__(pool, log_client)
- # 抓取账号新增粉丝
- async def crawl_new_fans_for_each_account(self, account_info: dict):
- cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
- if not cookie_obj:
- return
- # 获取失败详情
- fetch_response = await self.fetch_fail_index(account_info["gh_id"])
- fail_detail = json.loads(fetch_response[0]["fail_detail"] or "{}")
- cursor_openid = fail_detail.get("cursor_openid") or ""
- cursor_timestamp = fail_detail.get("cursor_timestamp") or ""
- newest_timestamp = fail_detail.get("newest_timestamp") or None
- if not newest_timestamp:
- newest_fans = await self.get_max_cursor_id(account_info["gh_id"])
- newest_timestamp = newest_fans[0]["user_create_time"]
- while True:
- try:
- response = await get_gzh_fans(
- token=cookie_obj[0]["token"],
- cookie=cookie_obj[0]["cookie"],
- cursor_id=cursor_openid,
- cursor_timestamp=cursor_timestamp,
- )
- base_resp = response.get("base_resp", {})
- code = base_resp.get("ret")
- error_msg = base_resp.get("err_msg")
- match code:
- case 0:
- user_list = response.get("user_list", {}).get("user_info_list")
- next_cursor_id = user_list[-1].get("user_openid")
- next_cursor_timestamp = user_list[-1].get("user_create_time")
- await self.insert_gzh_fans_batch(account_info, user_list)
- if next_cursor_timestamp <= newest_timestamp:
- await feishu_robot.bot(
- title=f"{account_info['account_name']}本月新增粉丝抓取完毕",
- detail=account_info,
- env="cookie_monitor",
- mention=False,
- )
- break
- else:
- cursor_openid = next_cursor_id
- cursor_timestamp = next_cursor_timestamp
- print(
- datetime.fromtimestamp(cursor_timestamp).strftime(
- "%Y-%m-%d %H:%M:%S"
- )
- )
- case "00040":
- print(f"token 非法: {error_msg}")
- await self.set_cookie_token_as_invalid(account_info["gh_id"])
- await feishu_robot.bot(
- title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
- detail=account_info,
- env="cookie_monitor",
- mention=False,
- )
- break
- case _:
- print("token 异常, 请及时刷新")
- await self.set_cookie_token_as_invalid(account_info["gh_id"])
- await feishu_robot.bot(
- title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
- detail=account_info,
- env="cookie_monitor",
- mention=False,
- )
- break
- except Exception as e:
- fail_obj = {
- "cursor_openid": cursor_openid,
- "cursor_timestamp": cursor_timestamp,
- "newest_timestamp": newest_timestamp,
- "fail_reason": str(e),
- }
- await self.insert_fail_detail(account_info["gh_id"], fail_obj)
- await feishu_robot.bot(
- title=f"{account_info['account_name']}本月新增粉丝抓取异常,请查看",
- detail=fail_obj,
- env="cookie_monitor",
- mention=False,
- )
- break
- # 抓取单个账号存量的粉丝
- async def crawl_history_fans_for_each_account(self, account_info: dict):
- cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
- if not cookie_obj:
- return
- if not account_info.get("cursor_openid"):
- cursor_openid = ""
- else:
- cursor_openid = account_info["cursor_openid"]
- if not account_info.get("cursor_timestamp"):
- cursor_timestamp = ""
- else:
- cursor_timestamp = account_info["cursor_timestamp"]
- response = await get_gzh_fans(
- token=cookie_obj[0]["token"],
- cookie=cookie_obj[0]["cookie"],
- cursor_id=cursor_openid,
- cursor_timestamp=cursor_timestamp,
- )
- base_resp = response.get("base_resp", {})
- code = base_resp.get("ret")
- error_msg = base_resp.get("err_msg")
- match code:
- case 0:
- user_list = response.get("user_list", {}).get("user_info_list")
- if not user_list:
- await feishu_robot.bot(
- title=f"{account_info['account_name']}的粉丝已经抓取完毕,请检查",
- detail=account_info,
- env="cookie_monitor",
- mention=False,
- )
- await self.update_account_crawl_history_status(
- account_info["gh_id"], self.INVALID_STATUS
- )
- next_cursor_id = user_list[-1].get("user_openid")
- next_cursor_timestamp = user_list[-1].get("user_create_time")
- await self.insert_gzh_fans_batch(account_info, user_list)
- await self.update_gzh_cursor_info(
- account_info["gh_id"], next_cursor_id, next_cursor_timestamp
- )
- case "00040":
- print(f"token 非法: {error_msg}")
- await self.set_cookie_token_as_invalid(account_info["gh_id"])
- await feishu_robot.bot(
- title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
- detail=account_info,
- env="cookie_monitor",
- mention=False,
- )
- case _:
- print("token 异常, 请及时刷新")
- await self.set_cookie_token_as_invalid(account_info["gh_id"])
- await feishu_robot.bot(
- title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
- detail=account_info,
- env="cookie_monitor",
- mention=False,
- )
- # 通过 access_token && open_id 抓取 union_id
- async def get_union_ids_for_each_account(self, account_info: dict):
- # 通过 access_token 获取 union_id
- user_list = await self.get_open_id_list_from_database(
- gh_id=account_info["gh_id"]
- )
- if not user_list:
- return
- access_token_info = await self.get_access_token_from_database(
- account_info["gh_id"]
- )
- if not access_token_info:
- return
- # 更新 token
- async def update_token(_new_token_info):
- _access_token = _new_token_info["access_token"]
- _expires_in = _new_token_info["expires_in"]
- await self.set_access_token_for_each_account(
- gh_id=account_info["gh_id"],
- access_token=_access_token,
- expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
- )
- print(f"{account_info['account_name']} access_token updated to database")
- expire_timestamp = access_token_info[0]["expire_timestamp"] or 0
- if int(time.time()) >= expire_timestamp:
- new_token_info = await get_access_token(
- account_info["app_id"], account_info["app_secret"]
- )
- access_token = new_token_info["access_token"]
- await update_token(_new_token_info=new_token_info)
- else:
- access_token = access_token_info[0]["access_token"]
- union_info = await get_union_id_batch(
- access_token=access_token, user_list=user_list
- )
- if union_info.get("errcode"):
- await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
- return
- # 将查询到的 union_id存储到数据库中
- user_info_list = union_info.get("user_info_list") or []
- if not user_info_list:
- return
- semaphore = asyncio.Semaphore(10)
- tasks = [
- self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
- for user_info in user_info_list
- ]
- await asyncio.gather(*tasks, return_exceptions=True)
- # main function
- async def deal(self, task_name):
- account_list = await self.get_account_list_from_database()
- match task_name:
- case "get_history_fans":
- crawl_history_accounts = [
- i
- for i in account_list
- if i["crawl_history_status"] == self.AVAILABLE_STATUS
- ]
- return await run_tasks_with_asyncio_task_group(
- task_list=crawl_history_accounts,
- handler=self.crawl_history_fans_for_each_account,
- max_concurrency=self.MAX_CONCURRENCY,
- fail_fast=False,
- description="抓取公众号账号粉丝",
- unit="page",
- )
- case "get_union_ids":
- binding_accounts = [
- i
- for i in account_list
- if i["binding_status"] == self.AVAILABLE_STATUS
- ]
- return await run_tasks_with_asyncio_task_group(
- task_list=binding_accounts,
- handler=self.get_union_ids_for_each_account,
- max_concurrency=self.MAX_CONCURRENCY,
- fail_fast=False,
- description="获取粉丝 union_id",
- unit="per100",
- )
- case "get_new_fans":
- consist_crawl_accounts = [
- i
- for i in account_list
- if i["consist_crawl_status"] == self.AVAILABLE_STATUS
- ]
- for account in consist_crawl_accounts:
- print(
- f"处理: {account['account_name']}: gh_id: {account['gh_id']}"
- )
- await self.crawl_new_fans_for_each_account(account)
- # return await run_tasks_with_asyncio_task_group()
- return {}
- case _:
- return {"err_msg": "invalid task_name"}
|