crawler_gzh_fans.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. import asyncio
  2. import json
  3. from datetime import datetime
  4. from applications.crawler.wechat import (
  5. get_gzh_fans,
  6. get_access_token,
  7. get_union_id_batch,
  8. )
  9. from applications.api import feishu_robot
  10. from applications.utils import run_tasks_with_asyncio_task_group
  11. class CrawlerGzhFansConst:
  12. INIT_STATUS = 0
  13. PROCESSING_STATUS = 1
  14. FINISHED_STATUS = 2
  15. FAILED_STATUS = 99
  16. AVAILABLE_STATUS = 1
  17. INVALID_STATUS = 0
  18. MAX_SIZE = 100
  19. MAX_CONCURRENCY = 5
  20. class CrawlerGzhFansBase(CrawlerGzhFansConst):
  21. def __init__(self, pool, log_client):
  22. self.pool = pool
  23. self.log_client = log_client
  24. # 从数据库获取 access_token
  25. async def get_access_token_from_database(self, gh_id):
  26. query = """
  27. SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
  28. """
  29. return await self.pool.async_fetch(
  30. query=query, params=(gh_id, self.AVAILABLE_STATUS)
  31. )
  32. # 从数据库获取粉丝 && token
  33. async def get_cookie_token_from_database(self, gh_id):
  34. query = """
  35. SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
  36. """
  37. return await self.pool.async_fetch(
  38. query=query, params=(gh_id, self.AVAILABLE_STATUS)
  39. )
  40. # 设置access_token状态为无效
  41. async def set_access_token_as_invalid(self, gh_id):
  42. query = """
  43. UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
  44. """
  45. return await self.pool.async_save(
  46. query=query, params=(self.INVALID_STATUS, gh_id)
  47. )
  48. # 设置 cookie 状态为无效
  49. async def set_cookie_token_as_invalid(self, gh_id):
  50. query = """
  51. UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
  52. """
  53. return await self.pool.async_save(
  54. query=query, params=(self.INVALID_STATUS, gh_id)
  55. )
  56. # 修改抓取账号状态
  57. async def update_account_crawl_history_status(self, gh_id, status):
  58. query = """
  59. UPDATE gzh_account_info SET crawl_history_status = %s WHERE gh_id = %s;
  60. """
  61. return await self.pool.async_save(query=query, params=(status, gh_id))
  62. # 获取账号列表
  63. async def get_account_list_from_database(self):
  64. query = """
  65. SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp, crawl_history_status
  66. FROM gzh_account_info WHERE status = %s;
  67. """
  68. return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
  69. # 获取 open_id 列表
  70. async def get_open_id_list_from_database(self, gh_id):
  71. query = """
  72. SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
  73. WHERE status = %s and gh_id = %s LIMIT %s;
  74. """
  75. return await self.pool.async_fetch(
  76. query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE)
  77. )
  78. # 批量插入粉丝信息
  79. async def insert_gzh_fans_batch(self, account_info, user_list):
  80. for user in user_list:
  81. query = """
  82. INSERT IGNORE INTO gzh_fans_info
  83. (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
  84. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  85. """
  86. params = (
  87. account_info["gh_id"],
  88. account_info["account_name"],
  89. user["user_openid"],
  90. user["user_name"],
  91. user["user_create_time"],
  92. user["user_head_img"],
  93. user["user_remark"],
  94. user["identity_type"],
  95. user["identity_open_id"],
  96. )
  97. await self.pool.async_save(query=query, params=params)
  98. # 插入失败详情
  99. async def insert_fail_detail(self, gh_id, fail_info):
  100. query = """
  101. UPDATE gzh_account_info SET fail_detail = %s WHERE gh_id = %s;
  102. """
  103. return await self.pool.async_save(
  104. query=query, params=(gh_id, json.dumps(fail_info, ensure_ascii=False))
  105. )
  106. # 获取失败详情
  107. async def fetch_fail_index(self, gh_id):
  108. query = """
  109. SELECT fail_detail FROM gzh_account_info WHERE gh_id = %s;
  110. """
  111. return await self.pool.async_fetch(query=query, params=(gh_id,))
  112. # 更新抓取union_id任务的状态码
  113. async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
  114. query = """
  115. UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s;
  116. """
  117. return await self.pool.async_save(
  118. query=query, params=(new_status, gh_id, user_openid, ori_status)
  119. )
  120. # 更新 union_id
  121. async def save_single_union_user(self, gh_id, user_info, semaphore):
  122. async with semaphore:
  123. openid = user_info.get("openid")
  124. if not openid:
  125. return 0
  126. try:
  127. locked = await self.update_task_status(
  128. gh_id=gh_id,
  129. user_openid=openid,
  130. ori_status=self.INIT_STATUS,
  131. new_status=self.PROCESSING_STATUS,
  132. )
  133. if not locked:
  134. return 0
  135. update_sql = """
  136. UPDATE gzh_fans_info
  137. SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s
  138. WHERE
  139. gh_id = %s AND user_openid = %s AND status = %s;
  140. """
  141. affected_rows = await self.pool.async_save(
  142. query=update_sql,
  143. params=(
  144. user_info.get("unionid"),
  145. user_info.get("sex"),
  146. user_info.get("city"),
  147. user_info.get("province"),
  148. user_info.get("subscribe_scene"),
  149. self.FINISHED_STATUS,
  150. gh_id,
  151. openid,
  152. self.PROCESSING_STATUS,
  153. ),
  154. )
  155. return affected_rows or 0
  156. except Exception:
  157. # ❗防止任务卡死,可选:回滚状态
  158. try:
  159. await self.update_task_status(
  160. gh_id=gh_id,
  161. user_openid=openid,
  162. ori_status=self.PROCESSING_STATUS,
  163. new_status=self.INIT_STATUS,
  164. )
  165. except Exception:
  166. pass
  167. return 0
  168. # 更新公众号的 cursor 位置
  169. async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
  170. query = """
  171. UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
  172. """
  173. return await self.pool.async_save(
  174. query=query, params=(cursor_id, cursor_timestamp, gh_id)
  175. )
  176. # 更新公众号的 cookie
  177. async def set_cookie_token_for_each_account(self, gh_id, cookie, token):
  178. query = """
  179. UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
  180. WHERE gh_id = %s;
  181. """
  182. return await self.pool.async_save(
  183. query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
  184. )
  185. async def set_access_token_for_each_account(self, gh_id, access_token):
  186. query = """
  187. UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
  188. """
  189. return await self.pool.async_save(
  190. query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
  191. )
  192. async def get_max_cursor_id(self, gh_id):
  193. query = """
  194. SELECT user_openid, user_create_time
  195. FROM gzh_fans_info WHERE gh_id = %s
  196. ORDER BY user_create_time DESC LIMIT 1;
  197. """
  198. return await self.pool.async_fetch(query=query, params=(gh_id,))
  199. class CrawlerGzhFans(CrawlerGzhFansBase):
  200. def __init__(self, pool, log_client):
  201. super().__init__(pool, log_client)
  202. # 抓取账号新增粉丝
  203. async def crawl_new_fans_for_each_account(self, account_info: dict):
  204. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  205. if not cookie_obj:
  206. return
  207. # 获取失败详情
  208. fetch_response = await self.fetch_fail_index(account_info["gh_id"])
  209. fail_detail = json.loads(fetch_response[0]["fail_detail"] or "{}")
  210. cursor_openid = fail_detail.get("cursor_openid") or ""
  211. cursor_timestamp = fail_detail.get("cursor_timestamp") or ""
  212. newest_timestamp = fail_detail.get("newest_timestamp") or None
  213. if not newest_timestamp:
  214. newest_fans = await self.get_max_cursor_id(account_info["gh_id"])
  215. newest_timestamp = newest_fans[0]["user_create_time"]
  216. while True:
  217. try:
  218. response = await get_gzh_fans(
  219. token=cookie_obj[0]["token"],
  220. cookie=cookie_obj[0]["cookie"],
  221. cursor_id=cursor_openid,
  222. cursor_timestamp=cursor_timestamp,
  223. )
  224. print(response)
  225. base_resp = response.get("base_resp", {})
  226. code = base_resp.get("ret")
  227. error_msg = base_resp.get("err_msg")
  228. match code:
  229. case 0:
  230. user_list = response.get("user_list", {}).get("user_info_list")
  231. next_cursor_id = user_list[-1].get("user_openid")
  232. next_cursor_timestamp = user_list[-1].get("user_create_time")
  233. await self.insert_gzh_fans_batch(account_info, user_list)
  234. if next_cursor_timestamp <= newest_timestamp:
  235. await feishu_robot.bot(
  236. title=f"{account_info['account_name']}本月新增粉丝抓取完毕",
  237. detail=account_info,
  238. env="cookie_monitor",
  239. mention=False,
  240. )
  241. break
  242. else:
  243. cursor_openid = next_cursor_id
  244. cursor_timestamp = next_cursor_timestamp
  245. print(datetime.fromtimestamp(cursor_timestamp).strftime("%Y-%m-%d %H:%M:%S"))
  246. case "00040":
  247. print(f"token 非法: {error_msg}")
  248. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  249. await feishu_robot.bot(
  250. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  251. detail=account_info,
  252. env="cookie_monitor",
  253. mention=False,
  254. )
  255. break
  256. case _:
  257. print("token 异常, 请及时刷新")
  258. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  259. await feishu_robot.bot(
  260. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  261. detail=account_info,
  262. env="cookie_monitor",
  263. mention=False,
  264. )
  265. break
  266. except Exception as e:
  267. fail_obj = {
  268. "cursor_openid": cursor_openid,
  269. "cursor_timestamp": cursor_timestamp,
  270. "newest_timestamp": newest_timestamp,
  271. "fail_reason": str(e),
  272. }
  273. await self.insert_fail_detail(account_info["gh_id"], fail_obj)
  274. await feishu_robot.bot(
  275. title=f"{account_info['account_name']}本月新增粉丝抓取异常,请查看",
  276. detail=fail_obj,
  277. env="cookie_monitor",
  278. mention=False,
  279. )
  280. break
  281. # 抓取单个账号存量的粉丝
  282. async def crawl_history_fans_for_each_account(self, account_info: dict):
  283. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  284. if not cookie_obj:
  285. return
  286. if not account_info.get("cursor_openid"):
  287. cursor_openid = ""
  288. else:
  289. cursor_openid = account_info["cursor_openid"]
  290. if not account_info.get("cursor_timestamp"):
  291. cursor_timestamp = ""
  292. else:
  293. cursor_timestamp = account_info["cursor_timestamp"]
  294. response = await get_gzh_fans(
  295. token=cookie_obj[0]["token"],
  296. cookie=cookie_obj[0]["cookie"],
  297. cursor_id=cursor_openid,
  298. cursor_timestamp=cursor_timestamp,
  299. )
  300. base_resp = response.get("base_resp", {})
  301. code = base_resp.get("ret")
  302. error_msg = base_resp.get("err_msg")
  303. match code:
  304. case 0:
  305. user_list = response.get("user_list", {}).get("user_info_list")
  306. if not user_list:
  307. await feishu_robot.bot(
  308. title=f"{account_info['account_name']}的粉丝已经抓取完毕,请检查",
  309. detail=account_info,
  310. env="cookie_monitor",
  311. mention=False,
  312. )
  313. await self.update_account_crawl_history_status(account_info["gh_id"], self.INVALID_STATUS)
  314. next_cursor_id = user_list[-1].get("user_openid")
  315. next_cursor_timestamp = user_list[-1].get("user_create_time")
  316. await self.insert_gzh_fans_batch(account_info, user_list)
  317. await self.update_gzh_cursor_info(
  318. account_info["gh_id"], next_cursor_id, next_cursor_timestamp
  319. )
  320. case "00040":
  321. print(f"token 非法: {error_msg}")
  322. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  323. await feishu_robot.bot(
  324. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  325. detail=account_info,
  326. env="cookie_monitor",
  327. mention=False,
  328. )
  329. case _:
  330. print("token 异常, 请及时刷新")
  331. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  332. await feishu_robot.bot(
  333. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  334. detail=account_info,
  335. env="cookie_monitor",
  336. mention=False,
  337. )
  338. # 通过 access_token && open_id 抓取 union_id
  339. async def get_union_ids_for_each_account(self, account_info: dict):
  340. # 通过 access_token 获取 union_id
  341. user_list = await self.get_open_id_list_from_database(
  342. gh_id=account_info["gh_id"]
  343. )
  344. if not user_list:
  345. return
  346. access_token_info = await self.get_access_token_from_database(
  347. account_info["gh_id"]
  348. )
  349. if not access_token_info:
  350. print(f"{account_info['account_name']}: access_token is not available")
  351. response = await get_access_token(
  352. account_info["app_id"], account_info["app_secret"]
  353. )
  354. access_token = response.get("access_token")
  355. await self.set_access_token_for_each_account(
  356. account_info["gh_id"], access_token
  357. )
  358. return
  359. access_token = access_token_info[0]["access_token"]
  360. union_info = await get_union_id_batch(
  361. access_token=access_token, user_list=user_list
  362. )
  363. if union_info.get("errcode"):
  364. await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
  365. return
  366. # 将查询到的 union_id存储到数据库中
  367. user_info_list = union_info.get("user_info_list") or []
  368. if not user_info_list:
  369. return
  370. semaphore = asyncio.Semaphore(10)
  371. tasks = [
  372. self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
  373. for user_info in user_info_list
  374. ]
  375. await asyncio.gather(*tasks, return_exceptions=True)
  376. # main function
  377. async def deal(self, task_name):
  378. account_list = await self.get_account_list_from_database()
  379. match task_name:
  380. case "get_history_fans":
  381. crawl_history_accounts = [i for i in account_list if i['crawl_history_status'] == self.AVAILABLE_STATUS]
  382. return await run_tasks_with_asyncio_task_group(
  383. task_list=crawl_history_accounts,
  384. handler=self.crawl_history_fans_for_each_account,
  385. max_concurrency=self.MAX_CONCURRENCY,
  386. fail_fast=False,
  387. description="抓取公众号账号粉丝",
  388. unit="page",
  389. )
  390. case "get_union_ids":
  391. return await run_tasks_with_asyncio_task_group(
  392. task_list=account_list,
  393. handler=self.get_union_ids_for_each_account,
  394. max_concurrency=self.MAX_CONCURRENCY,
  395. fail_fast=False,
  396. description="获取粉丝 union_id",
  397. unit="per100",
  398. )
  399. case "get_new_fans":
  400. for account in account_list:
  401. print(f"处理: {account['account_name']}")
  402. await self.crawl_new_fans_for_each_account(account)
  403. # return await run_tasks_with_asyncio_task_group()
  404. return {}
  405. case _:
  406. return {"err_msg": "invalid task_name"}