|
|
@@ -1,4 +1,6 @@
|
|
|
import asyncio
|
|
|
+import json
|
|
|
+from datetime import datetime
|
|
|
|
|
|
from applications.crawler.wechat import (
|
|
|
get_gzh_fans,
|
|
|
@@ -110,6 +112,22 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
|
|
|
)
|
|
|
await self.pool.async_save(query=query, params=params)
|
|
|
|
|
|
+ # 插入失败详情
|
|
|
+ async def insert_fail_detail(self, gh_id, fail_info):
|
|
|
+ query = """
|
|
|
+ UPDATE gzh_account_info SET fail_detail = %s WHERE gh_id = %s;
|
|
|
+ """
|
|
|
+ return await self.pool.async_save(
|
|
|
+ query=query, params=(gh_id, json.dumps(fail_info, ensure_ascii=False))
|
|
|
+ )
|
|
|
+
|
|
|
+ # 获取失败详情
|
|
|
+ async def fetch_fail_index(self, gh_id):
|
|
|
+ query = """
|
|
|
+ SELECT fail_detail FROM gzh_account_info WHERE gh_id = %s;
|
|
|
+ """
|
|
|
+ return await self.pool.async_fetch(query=query, params=(gh_id,))
|
|
|
+
|
|
|
# 更新抓取union_id任务的状态码
|
|
|
async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
|
|
|
query = """
|
|
|
@@ -198,13 +216,110 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
|
|
|
query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
|
|
|
)
|
|
|
|
|
|
+ async def get_max_cursor_id(self, gh_id):
|
|
|
+ query = """
|
|
|
+ SELECT user_openid, user_create_time
|
|
|
+ FROM gzh_fans_info WHERE gh_id = %s
|
|
|
+ ORDER BY user_create_time DESC LIMIT 1;
|
|
|
+ """
|
|
|
+ return await self.pool.async_fetch(query=query, params=(gh_id,))
|
|
|
+
|
|
|
|
|
|
class CrawlerGzhFans(CrawlerGzhFansBase):
|
|
|
def __init__(self, pool, log_client):
|
|
|
super().__init__(pool, log_client)
|
|
|
|
|
|
+ # 抓取账号新增粉丝
|
|
|
+ async def crawl_new_fans_for_each_account(self, account_info: dict):
|
|
|
+ cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
|
|
|
+ if not cookie_obj:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 获取失败详情
|
|
|
+ fetch_response = await self.fetch_fail_index(account_info["gh_id"])
|
|
|
+ fail_detail = json.loads(fetch_response[0]["fail_detail"] or "{}")
|
|
|
+
|
|
|
+ cursor_openid = fail_detail.get("cursor_openid") or ""
|
|
|
+ cursor_timestamp = fail_detail.get("cursor_timestamp") or ""
|
|
|
+ newest_timestamp = fail_detail.get("newest_timestamp") or None
|
|
|
+
|
|
|
+ if not newest_timestamp:
|
|
|
+ newest_fans = await self.get_max_cursor_id(account_info["gh_id"])
|
|
|
+ newest_timestamp = newest_fans[0]["user_create_time"]
|
|
|
+
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ response = await get_gzh_fans(
|
|
|
+ token=cookie_obj[0]["token"],
|
|
|
+ cookie=cookie_obj[0]["cookie"],
|
|
|
+ cursor_id=cursor_openid,
|
|
|
+ cursor_timestamp=cursor_timestamp,
|
|
|
+ )
|
|
|
+ print(response)
|
|
|
+ base_resp = response.get("base_resp", {})
|
|
|
+ code = base_resp.get("ret")
|
|
|
+ error_msg = base_resp.get("err_msg")
|
|
|
+
|
|
|
+ match code:
|
|
|
+ case 0:
|
|
|
+ user_list = response.get("user_list", {}).get("user_info_list")
|
|
|
+ next_cursor_id = user_list[-1].get("user_openid")
|
|
|
+ next_cursor_timestamp = user_list[-1].get("user_create_time")
|
|
|
+ await self.insert_gzh_fans_batch(account_info, user_list)
|
|
|
+ if next_cursor_timestamp <= newest_timestamp:
|
|
|
+ await feishu_robot.bot(
|
|
|
+ title=f"{account_info['account_name']}本月新增粉丝抓取完毕",
|
|
|
+ detail=account_info,
|
|
|
+ env="cookie_monitor",
|
|
|
+ mention=False,
|
|
|
+ )
|
|
|
+ break
|
|
|
+
|
|
|
+ else:
|
|
|
+ cursor_openid = next_cursor_id
|
|
|
+ cursor_timestamp = next_cursor_timestamp
|
|
|
+ print(datetime.fromtimestamp(cursor_timestamp).strftime("%Y-%m-%d %H:%M:%S"))
|
|
|
+
|
|
|
+ case "00040":
|
|
|
+ print(f"token 非法: {error_msg}")
|
|
|
+ await self.set_cookie_token_as_invalid(account_info["gh_id"])
|
|
|
+ await feishu_robot.bot(
|
|
|
+ title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
|
|
|
+ detail=account_info,
|
|
|
+ env="cookie_monitor",
|
|
|
+ mention=False,
|
|
|
+ )
|
|
|
+ break
|
|
|
+
|
|
|
+ case _:
|
|
|
+ print("token 异常, 请及时刷新")
|
|
|
+ await self.set_cookie_token_as_invalid(account_info["gh_id"])
|
|
|
+ await feishu_robot.bot(
|
|
|
+ title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
|
|
|
+ detail=account_info,
|
|
|
+ env="cookie_monitor",
|
|
|
+ mention=False,
|
|
|
+ )
|
|
|
+ break
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ fail_obj = {
|
|
|
+ "cursor_openid": cursor_openid,
|
|
|
+ "cursor_timestamp": cursor_timestamp,
|
|
|
+ "newest_timestamp": newest_timestamp,
|
|
|
+ "fail_reason": str(e),
|
|
|
+ }
|
|
|
+ await self.insert_fail_detail(account_info["gh_id"], fail_obj)
|
|
|
+ await feishu_robot.bot(
|
|
|
+ title=f"{account_info['account_name']}本月新增粉丝抓取异常,请查看",
|
|
|
+ detail=fail_obj,
|
|
|
+ env="cookie_monitor",
|
|
|
+ mention=False,
|
|
|
+ )
|
|
|
+ break
|
|
|
+
|
|
|
# 抓取单个账号存量的粉丝
|
|
|
- async def crawl_history_fans_for_each_account(self, account_info):
|
|
|
+ async def crawl_history_fans_for_each_account(self, account_info: dict):
|
|
|
cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
|
|
|
if not cookie_obj:
|
|
|
return
|
|
|
@@ -340,9 +455,10 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
|
|
|
|
|
|
case "get_new_fans":
|
|
|
for account in account_list:
|
|
|
- print(account)
|
|
|
+ print(f"处理: {account['account_name']}")
|
|
|
+ await self.crawl_new_fans_for_each_account(account)
|
|
|
# return await run_tasks_with_asyncio_task_group()
|
|
|
- return 0
|
|
|
+ return {}
|
|
|
|
|
|
case _:
|
|
|
return {"err_msg": "invalid task_name"}
|