crawler_gzh_fans.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. import asyncio
  2. import json
  3. import time
  4. from datetime import datetime
  5. from app.infra.crawler.wechat import (
  6. get_gzh_fans,
  7. get_access_token,
  8. get_union_id_batch,
  9. )
  10. from app.infra.external import feishu_robot
  11. from app.infra.shared import run_tasks_with_asyncio_task_group
  12. class CrawlerGzhFansConst:
  13. INIT_STATUS = 0
  14. PROCESSING_STATUS = 1
  15. FINISHED_STATUS = 2
  16. FAILED_STATUS = 99
  17. AVAILABLE_STATUS = 1
  18. INVALID_STATUS = 0
  19. MAX_SIZE = 100
  20. MAX_CONCURRENCY = 5
  21. GAP_DURATION = 300
  22. class CrawlerGzhFansBase(CrawlerGzhFansConst):
  23. def __init__(self, pool, log_client):
  24. self.pool = pool
  25. self.log_client = log_client
  26. # 从数据库获取 access_token
  27. async def get_access_token_from_database(self, gh_id):
  28. query = """
  29. SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
  30. """
  31. return await self.pool.async_fetch(query=query, params=(gh_id,))
  32. # 从数据库获取粉丝 && token
  33. async def get_cookie_token_from_database(self, gh_id):
  34. query = """
  35. SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
  36. """
  37. return await self.pool.async_fetch(
  38. query=query, params=(gh_id, self.AVAILABLE_STATUS)
  39. )
  40. # 设置access_token状态为无效
  41. async def set_access_token_as_invalid(self, gh_id):
  42. query = """
  43. UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
  44. """
  45. return await self.pool.async_save(
  46. query=query, params=(self.INVALID_STATUS, gh_id)
  47. )
  48. # 设置 cookie 状态为无效
  49. async def set_cookie_token_as_invalid(self, gh_id):
  50. query = """
  51. UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
  52. """
  53. return await self.pool.async_save(
  54. query=query, params=(self.INVALID_STATUS, gh_id)
  55. )
  56. # 修改抓取账号状态
  57. async def update_account_crawl_history_status(self, gh_id, status):
  58. query = """
  59. UPDATE gzh_account_info SET crawl_history_status = %s WHERE gh_id = %s;
  60. """
  61. return await self.pool.async_save(query=query, params=(status, gh_id))
  62. # 获取账号列表
  63. async def get_account_list_from_database(self):
  64. query = """
  65. SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp,
  66. crawl_history_status, consist_crawl_status, binding_status
  67. FROM gzh_account_info WHERE status = %s;
  68. """
  69. return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
  70. # 获取 open_id 列表
  71. async def get_open_id_list_from_database(self, gh_id):
  72. query = """
  73. SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
  74. WHERE status = %s and gh_id = %s LIMIT %s;
  75. """
  76. return await self.pool.async_fetch(
  77. query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE)
  78. )
  79. # 批量插入粉丝信息
  80. async def insert_gzh_fans_batch(self, account_info, user_list):
  81. param_list = [
  82. (
  83. account_info["gh_id"],
  84. account_info["account_name"],
  85. user["user_openid"],
  86. user["user_name"],
  87. user["user_create_time"],
  88. user["user_head_img"],
  89. user["user_remark"],
  90. user["identity_type"],
  91. user["identity_open_id"],
  92. )
  93. for user in user_list
  94. ]
  95. query = """
  96. INSERT IGNORE INTO gzh_fans_info
  97. (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
  98. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  99. """
  100. await self.pool.async_save(query=query, params=param_list, batch=True)
  101. # 插入失败详情
  102. async def insert_fail_detail(self, gh_id, fail_info):
  103. query = """
  104. UPDATE gzh_account_info SET fail_detail = %s WHERE gh_id = %s;
  105. """
  106. return await self.pool.async_save(
  107. query=query, params=(gh_id, json.dumps(fail_info, ensure_ascii=False))
  108. )
  109. # 获取失败详情
  110. async def fetch_fail_index(self, gh_id):
  111. query = """
  112. SELECT fail_detail FROM gzh_account_info WHERE gh_id = %s;
  113. """
  114. return await self.pool.async_fetch(query=query, params=(gh_id,))
  115. # 更新抓取union_id任务的状态码
  116. async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
  117. query = """
  118. UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s;
  119. """
  120. return await self.pool.async_save(
  121. query=query, params=(new_status, gh_id, user_openid, ori_status)
  122. )
  123. # 更新 union_id
  124. async def save_single_union_user(self, gh_id, user_info, semaphore):
  125. async with semaphore:
  126. openid = user_info.get("openid")
  127. if not openid:
  128. return 0
  129. try:
  130. locked = await self.update_task_status(
  131. gh_id=gh_id,
  132. user_openid=openid,
  133. ori_status=self.INIT_STATUS,
  134. new_status=self.PROCESSING_STATUS,
  135. )
  136. if not locked:
  137. return 0
  138. update_sql = """
  139. UPDATE gzh_fans_info
  140. SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s
  141. WHERE
  142. gh_id = %s AND user_openid = %s AND status = %s;
  143. """
  144. affected_rows = await self.pool.async_save(
  145. query=update_sql,
  146. params=(
  147. user_info.get("unionid"),
  148. user_info.get("sex"),
  149. user_info.get("city"),
  150. user_info.get("province"),
  151. user_info.get("subscribe_scene"),
  152. self.FINISHED_STATUS,
  153. gh_id,
  154. openid,
  155. self.PROCESSING_STATUS,
  156. ),
  157. )
  158. return affected_rows or 0
  159. except Exception:
  160. # ❗防止任务卡死,可选:回滚状态
  161. try:
  162. await self.update_task_status(
  163. gh_id=gh_id,
  164. user_openid=openid,
  165. ori_status=self.PROCESSING_STATUS,
  166. new_status=self.INIT_STATUS,
  167. )
  168. except Exception:
  169. pass
  170. return 0
  171. # 更新公众号的 cursor 位置
  172. async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
  173. query = """
  174. UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
  175. """
  176. return await self.pool.async_save(
  177. query=query, params=(cursor_id, cursor_timestamp, gh_id)
  178. )
  179. # 更新公众号的 cookie
  180. async def set_cookie_token_for_each_account(self, gh_id, cookie, token):
  181. query = """
  182. UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
  183. WHERE gh_id = %s;
  184. """
  185. return await self.pool.async_save(
  186. query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
  187. )
  188. async def set_access_token_for_each_account(
  189. self, gh_id, access_token, expire_timestamp
  190. ):
  191. query = """
  192. UPDATE gzh_cookie_info
  193. SET access_token = %s, access_token_status = %s, expire_timestamp = %s
  194. WHERE gh_id = %s;
  195. """
  196. return await self.pool.async_save(
  197. query=query,
  198. params=(access_token, self.AVAILABLE_STATUS, expire_timestamp, gh_id),
  199. )
  200. async def get_max_cursor_id(self, gh_id):
  201. query = """
  202. SELECT user_openid, user_create_time
  203. FROM gzh_fans_info WHERE gh_id = %s
  204. ORDER BY user_create_time DESC LIMIT 1;
  205. """
  206. return await self.pool.async_fetch(query=query, params=(gh_id,))
  207. class CrawlerGzhFans(CrawlerGzhFansBase):
  208. def __init__(self, pool, log_client):
  209. super().__init__(pool, log_client)
  210. # 抓取账号新增粉丝
  211. async def crawl_new_fans_for_each_account(self, account_info: dict):
  212. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  213. if not cookie_obj:
  214. return
  215. # 获取失败详情
  216. fetch_response = await self.fetch_fail_index(account_info["gh_id"])
  217. fail_detail = json.loads(fetch_response[0]["fail_detail"] or "{}")
  218. cursor_openid = fail_detail.get("cursor_openid") or ""
  219. cursor_timestamp = fail_detail.get("cursor_timestamp") or ""
  220. newest_timestamp = fail_detail.get("newest_timestamp") or None
  221. if not newest_timestamp:
  222. newest_fans = await self.get_max_cursor_id(account_info["gh_id"])
  223. newest_timestamp = newest_fans[0]["user_create_time"]
  224. while True:
  225. try:
  226. response = await get_gzh_fans(
  227. token=cookie_obj[0]["token"],
  228. cookie=cookie_obj[0]["cookie"],
  229. cursor_id=cursor_openid,
  230. cursor_timestamp=cursor_timestamp,
  231. )
  232. base_resp = response.get("base_resp", {})
  233. code = base_resp.get("ret")
  234. error_msg = base_resp.get("err_msg")
  235. match code:
  236. case 0:
  237. user_list = response.get("user_list", {}).get("user_info_list")
  238. next_cursor_id = user_list[-1].get("user_openid")
  239. next_cursor_timestamp = user_list[-1].get("user_create_time")
  240. await self.insert_gzh_fans_batch(account_info, user_list)
  241. if next_cursor_timestamp <= newest_timestamp:
  242. await feishu_robot.bot(
  243. title=f"{account_info['account_name']}本月新增粉丝抓取完毕",
  244. detail=account_info,
  245. env="cookie_monitor",
  246. mention=False,
  247. )
  248. break
  249. else:
  250. cursor_openid = next_cursor_id
  251. cursor_timestamp = next_cursor_timestamp
  252. print(
  253. datetime.fromtimestamp(cursor_timestamp).strftime(
  254. "%Y-%m-%d %H:%M:%S"
  255. )
  256. )
  257. case "00040":
  258. print(f"token 非法: {error_msg}")
  259. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  260. await feishu_robot.bot(
  261. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  262. detail=account_info,
  263. env="cookie_monitor",
  264. mention=False,
  265. )
  266. break
  267. case _:
  268. print("token 异常, 请及时刷新")
  269. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  270. await feishu_robot.bot(
  271. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  272. detail=account_info,
  273. env="cookie_monitor",
  274. mention=False,
  275. )
  276. break
  277. except Exception as e:
  278. fail_obj = {
  279. "cursor_openid": cursor_openid,
  280. "cursor_timestamp": cursor_timestamp,
  281. "newest_timestamp": newest_timestamp,
  282. "fail_reason": str(e),
  283. }
  284. await self.insert_fail_detail(account_info["gh_id"], fail_obj)
  285. await feishu_robot.bot(
  286. title=f"{account_info['account_name']}本月新增粉丝抓取异常,请查看",
  287. detail=fail_obj,
  288. env="cookie_monitor",
  289. mention=False,
  290. )
  291. break
  292. # 抓取单个账号存量的粉丝
  293. async def crawl_history_fans_for_each_account(self, account_info: dict):
  294. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  295. if not cookie_obj:
  296. return
  297. if not account_info.get("cursor_openid"):
  298. cursor_openid = ""
  299. else:
  300. cursor_openid = account_info["cursor_openid"]
  301. if not account_info.get("cursor_timestamp"):
  302. cursor_timestamp = ""
  303. else:
  304. cursor_timestamp = account_info["cursor_timestamp"]
  305. response = await get_gzh_fans(
  306. token=cookie_obj[0]["token"],
  307. cookie=cookie_obj[0]["cookie"],
  308. cursor_id=cursor_openid,
  309. cursor_timestamp=cursor_timestamp,
  310. )
  311. base_resp = response.get("base_resp", {})
  312. code = base_resp.get("ret")
  313. error_msg = base_resp.get("err_msg")
  314. match code:
  315. case 0:
  316. user_list = response.get("user_list", {}).get("user_info_list")
  317. if not user_list:
  318. await feishu_robot.bot(
  319. title=f"{account_info['account_name']}的粉丝已经抓取完毕,请检查",
  320. detail=account_info,
  321. env="cookie_monitor",
  322. mention=False,
  323. )
  324. await self.update_account_crawl_history_status(
  325. account_info["gh_id"], self.INVALID_STATUS
  326. )
  327. next_cursor_id = user_list[-1].get("user_openid")
  328. next_cursor_timestamp = user_list[-1].get("user_create_time")
  329. await self.insert_gzh_fans_batch(account_info, user_list)
  330. await self.update_gzh_cursor_info(
  331. account_info["gh_id"], next_cursor_id, next_cursor_timestamp
  332. )
  333. case "00040":
  334. print(f"token 非法: {error_msg}")
  335. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  336. await feishu_robot.bot(
  337. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  338. detail=account_info,
  339. env="cookie_monitor",
  340. mention=False,
  341. )
  342. case _:
  343. print("token 异常, 请及时刷新")
  344. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  345. await feishu_robot.bot(
  346. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  347. detail=account_info,
  348. env="cookie_monitor",
  349. mention=False,
  350. )
  351. # 通过 access_token && open_id 抓取 union_id
  352. async def get_union_ids_for_each_account(self, account_info: dict):
  353. # 通过 access_token 获取 union_id
  354. user_list = await self.get_open_id_list_from_database(
  355. gh_id=account_info["gh_id"]
  356. )
  357. if not user_list:
  358. return
  359. access_token_info = await self.get_access_token_from_database(
  360. account_info["gh_id"]
  361. )
  362. if not access_token_info:
  363. return
  364. # 更新 token
  365. async def update_token(_new_token_info):
  366. _access_token = _new_token_info["access_token"]
  367. _expires_in = _new_token_info["expires_in"]
  368. await self.set_access_token_for_each_account(
  369. gh_id=account_info["gh_id"],
  370. access_token=_access_token,
  371. expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
  372. )
  373. print(f"{account_info['account_name']} access_token updated to database")
  374. expire_timestamp = access_token_info[0]["expire_timestamp"] or 0
  375. if int(time.time()) >= expire_timestamp:
  376. new_token_info = await get_access_token(
  377. account_info["app_id"], account_info["app_secret"]
  378. )
  379. access_token = new_token_info["access_token"]
  380. await update_token(_new_token_info=new_token_info)
  381. else:
  382. access_token = access_token_info[0]["access_token"]
  383. union_info = await get_union_id_batch(
  384. access_token=access_token, user_list=user_list
  385. )
  386. if union_info.get("errcode"):
  387. await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
  388. return
  389. # 将查询到的 union_id存储到数据库中
  390. user_info_list = union_info.get("user_info_list") or []
  391. if not user_info_list:
  392. return
  393. semaphore = asyncio.Semaphore(10)
  394. tasks = [
  395. self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
  396. for user_info in user_info_list
  397. ]
  398. await asyncio.gather(*tasks, return_exceptions=True)
  399. # main function
  400. async def deal(self, task_name):
  401. account_list = await self.get_account_list_from_database()
  402. match task_name:
  403. case "get_history_fans":
  404. crawl_history_accounts = [
  405. i
  406. for i in account_list
  407. if i["crawl_history_status"] == self.AVAILABLE_STATUS
  408. ]
  409. return await run_tasks_with_asyncio_task_group(
  410. task_list=crawl_history_accounts,
  411. handler=self.crawl_history_fans_for_each_account,
  412. max_concurrency=self.MAX_CONCURRENCY,
  413. fail_fast=False,
  414. description="抓取公众号账号粉丝",
  415. unit="page",
  416. )
  417. case "get_union_ids":
  418. binding_accounts = [
  419. i
  420. for i in account_list
  421. if i["binding_status"] == self.AVAILABLE_STATUS
  422. ]
  423. return await run_tasks_with_asyncio_task_group(
  424. task_list=binding_accounts,
  425. handler=self.get_union_ids_for_each_account,
  426. max_concurrency=self.MAX_CONCURRENCY,
  427. fail_fast=False,
  428. description="获取粉丝 union_id",
  429. unit="per100",
  430. )
  431. case "get_new_fans":
  432. consist_crawl_accounts = [
  433. i
  434. for i in account_list
  435. if i["consist_crawl_status"] == self.AVAILABLE_STATUS
  436. ]
  437. for account in consist_crawl_accounts:
  438. print(
  439. f"处理: {account['account_name']}: gh_id: {account['gh_id']}"
  440. )
  441. await self.crawl_new_fans_for_each_account(account)
  442. # return await run_tasks_with_asyncio_task_group()
  443. return {}
  444. case _:
  445. return {"err_msg": "invalid task_name"}