| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import json
- from applications.crawler.wechat import (
- get_gzh_fans,
- get_access_token,
- get_union_id_batch,
- )
- from applications.api import feishu_robot
- from applications.utils import run_tasks_with_asyncio_task_group
- class CrawlerGzhFansBase:
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- # 从数据库获取 access_token
- async def get_access_token_from_database(self, gh_id):
- query = """
- SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
- """
- return await self.pool.async_fetch(query=query, params=(gh_id, 1))
- # 从数据库获取粉丝 && token
- async def get_cookie_token_from_database(self, gh_id):
- query = """
- SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
- """
- return await self.pool.async_fetch(query=query, params=(gh_id, 1))
- # 设置access_token状态为无效
- async def set_access_token_as_invalid(self, gh_id):
- query = """
- UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(query=query, params=(0, gh_id))
- # 设置 cookie 状态为无效
- async def set_cookie_token_as_invalid(self, gh_id):
- query = """
- UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(query=query, params=(0, gh_id))
- # 获取账号列表
- async def get_account_list_from_database(self):
- query = """
- SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp
- FROM gzh_account_info WHERE status = %s;
- """
- return await self.pool.async_fetch(query=query, params=(1,))
- # 获取 open_id 列表
- async def get_open_id_list_from_database(self, gh_id):
- query = """
- SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
- WHERE status = %s and gh_id = %s LIMIT %s;
- """
- return await self.pool.async_fetch(query=query, params=(0, gh_id, 20))
- # 批量插入粉丝信息
- async def insert_gzh_fans_batch(self, account_info, user_list):
- for user in user_list:
- query = """
- INSERT IGNORE INTO gzh_fans_info
- (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- params = (
- account_info["gh_id"],
- account_info["account_name"],
- user["user_openid"],
- user["user_name"],
- user["user_create_time"],
- user["user_head_img"],
- user["user_remark"],
- user["identity_type"],
- user["identity_open_id"],
- )
- await self.pool.async_save(query=query, params=params)
- # 更新公众号的 cursor 位置
- async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
- query = """
- UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(cursor_id, cursor_timestamp, gh_id)
- )
- # 更新公众号的 cookie
- async def set_cookie_for_each_account(self, gh_id, cookie, token):
- query = """
- UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
- WHERE gh_id = %s;
- """
- return await self.pool.async_save(query=query, params=(cookie, token, 1, gh_id))
- async def set_access_token_for_each_account(self, gh_id, access_token):
- query = """
- UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(query=query, params=(access_token, 1, gh_id))
- class CrawlerGzhFans(CrawlerGzhFansBase):
- def __init__(self, pool, log_client):
- super().__init__(pool, log_client)
- # 抓取单个账号的粉丝
- async def crawl_fans_for_each_account(self, account_info):
- cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
- if not cookie_obj:
- return
- if not account_info.get("cursor_openid"):
- cursor_openid = ''
- else:
- cursor_openid = account_info["cursor_openid"]
- if not account_info.get("cursor_timestamp"):
- cursor_timestamp = ''
- else:
- cursor_timestamp = account_info["cursor_timestamp"]
- response = await get_gzh_fans(
- token=cookie_obj[0]["token"],
- cookie=cookie_obj[0]["cookie"],
- cursor_id=cursor_openid,
- cursor_timestamp=cursor_timestamp,
- )
- base_resp = response.get("base_resp", {})
- code = base_resp.get("ret")
- error_msg = base_resp.get("err_msg")
- match code:
- case 0:
- user_list = response.get("user_list", {}).get("user_info_list")
- next_cursor_id = user_list[-1].get("user_openid")
- next_cursor_timestamp = user_list[-1].get("user_create_time")
- await self.insert_gzh_fans_batch(account_info, user_list)
- await self.update_gzh_cursor_info(
- account_info["gh_id"], next_cursor_id, next_cursor_timestamp
- )
- case '00040':
- print(f"token 非法: {error_msg}")
- await self.set_cookie_token_as_invalid(account_info["gh_id"])
- await feishu_robot.bot(
- title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
- detail=account_info,
- env="cookie_monitor_bot",
- mention=False,
- )
- case _:
- print("token 异常, 请及时刷新")
- await self.set_cookie_token_as_invalid(account_info["gh_id"])
- await feishu_robot.bot(
- title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
- detail=account_info,
- env="cookie_monitor_bot",
- mention=False,
- )
- # 通过 access_token && open_id 抓取 union_id
- async def get_union_ids_for_each_account(self, account_info: dict):
- access_token = await self.get_access_token_from_database(account_info["gh_id"])
- if not access_token:
- print(f"{account_info['account_name']}: access_token is not available")
- response = await get_access_token(account_info['app_id'], account_info["app_secret"])
- print(json.dumps(response, indent=4, ensure_ascii=False))
- # access_token = response.get("access_token")
- # await self.set_access_token_for_each_account(account_info["gh_id"], access_token)
- #
- # # 通过 access_token 获取 union_id
- # user_list = await self.get_open_id_list_from_database(gh_id=account_info["gh_id"])
- # union_info = await get_union_id_batch(access_token=access_token ,user_list=user_list)
- # print(json.dumps(union_info, indent=4, ensure_ascii=False))
- # main function
- async def deal(self):
- account_list = await self.get_account_list_from_database()
- # for account_info in account_list:
- # # await self.get_union_ids_for_each_account(account_info)
- # await self.crawl_fans_for_each_account(account_info)
- return await run_tasks_with_asyncio_task_group(
- task_list=account_list,
- handler=self.crawl_fans_for_each_account,
- max_concurrency=5,
- fail_fast=False,
- description="抓取公众号账号粉丝",
- unit="page",
- )
|