crawler_gzh_fans.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. import asyncio
  2. import json
  3. from applications.crawler.wechat import (
  4. get_gzh_fans,
  5. get_access_token,
  6. get_union_id_batch,
  7. )
  8. from applications.api import feishu_robot
  9. from applications.utils import run_tasks_with_asyncio_task_group
  10. class CrawlerGzhFansConst:
  11. INIT_STATUS = 0
  12. PROCESSING_STATUS = 1
  13. FINISHED_STATUS = 2
  14. FAILED_STATUS = 99
  15. AVAILABLE_STATUS = 1
  16. INVALID_STATUS = 0
  17. MAX_SIZE = 100
  18. MAX_CONCURRENCY = 5
  19. class CrawlerGzhFansBase(CrawlerGzhFansConst):
  20. def __init__(self, pool, log_client):
  21. self.pool = pool
  22. self.log_client = log_client
  23. # 从数据库获取 access_token
  24. async def get_access_token_from_database(self, gh_id):
  25. query = """
  26. SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
  27. """
  28. return await self.pool.async_fetch(
  29. query=query, params=(gh_id, self.AVAILABLE_STATUS)
  30. )
  31. # 从数据库获取粉丝 && token
  32. async def get_cookie_token_from_database(self, gh_id):
  33. query = """
  34. SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
  35. """
  36. return await self.pool.async_fetch(
  37. query=query, params=(gh_id, self.AVAILABLE_STATUS)
  38. )
  39. # 设置access_token状态为无效
  40. async def set_access_token_as_invalid(self, gh_id):
  41. query = """
  42. UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
  43. """
  44. return await self.pool.async_save(
  45. query=query, params=(self.INVALID_STATUS, gh_id)
  46. )
  47. # 设置 cookie 状态为无效
  48. async def set_cookie_token_as_invalid(self, gh_id):
  49. query = """
  50. UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
  51. """
  52. return await self.pool.async_save(
  53. query=query, params=(self.INVALID_STATUS, gh_id)
  54. )
  55. # 修改抓取账号状态
  56. async def update_account_crawl_history_status(self, gh_id, status):
  57. query = """
  58. UPDATE gzh_account_info SET crawl_history_status = %s WHERE gh_id = %s;
  59. """
  60. return await self.pool.async_save(query=query, params=(status, gh_id))
  61. # 获取账号列表
  62. async def get_account_list_from_database(self):
  63. query = """
  64. SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp, crawl_history_status
  65. FROM gzh_account_info WHERE status = %s;
  66. """
  67. return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
  68. # 获取 open_id 列表
  69. async def get_open_id_list_from_database(self, gh_id):
  70. query = """
  71. SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
  72. WHERE status = %s and gh_id = %s LIMIT %s;
  73. """
  74. return await self.pool.async_fetch(
  75. query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE)
  76. )
  77. # 批量插入粉丝信息
  78. async def insert_gzh_fans_batch(self, account_info, user_list):
  79. for user in user_list:
  80. query = """
  81. INSERT IGNORE INTO gzh_fans_info
  82. (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
  83. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  84. """
  85. params = (
  86. account_info["gh_id"],
  87. account_info["account_name"],
  88. user["user_openid"],
  89. user["user_name"],
  90. user["user_create_time"],
  91. user["user_head_img"],
  92. user["user_remark"],
  93. user["identity_type"],
  94. user["identity_open_id"],
  95. )
  96. await self.pool.async_save(query=query, params=params)
  97. # 更新抓取union_id任务的状态码
  98. async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
  99. query = """
  100. UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s;
  101. """
  102. return await self.pool.async_save(
  103. query=query, params=(new_status, gh_id, user_openid, ori_status)
  104. )
  105. # 更新 union_id
  106. async def save_single_union_user(self, gh_id, user_info, semaphore):
  107. async with semaphore:
  108. openid = user_info.get("openid")
  109. if not openid:
  110. return 0
  111. try:
  112. locked = await self.update_task_status(
  113. gh_id=gh_id,
  114. user_openid=openid,
  115. ori_status=self.INIT_STATUS,
  116. new_status=self.PROCESSING_STATUS,
  117. )
  118. if not locked:
  119. return 0
  120. update_sql = """
  121. UPDATE gzh_fans_info
  122. SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s
  123. WHERE
  124. gh_id = %s AND user_openid = %s AND status = %s;
  125. """
  126. affected_rows = await self.pool.async_save(
  127. query=update_sql,
  128. params=(
  129. user_info.get("unionid"),
  130. user_info.get("sex"),
  131. user_info.get("city"),
  132. user_info.get("province"),
  133. user_info.get("subscribe_scene"),
  134. self.FINISHED_STATUS,
  135. gh_id,
  136. openid,
  137. self.PROCESSING_STATUS,
  138. ),
  139. )
  140. return affected_rows or 0
  141. except Exception:
  142. # ❗防止任务卡死,可选:回滚状态
  143. try:
  144. await self.update_task_status(
  145. gh_id=gh_id,
  146. user_openid=openid,
  147. ori_status=self.PROCESSING_STATUS,
  148. new_status=self.INIT_STATUS,
  149. )
  150. except Exception:
  151. pass
  152. return 0
  153. # 更新公众号的 cursor 位置
  154. async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
  155. query = """
  156. UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
  157. """
  158. return await self.pool.async_save(
  159. query=query, params=(cursor_id, cursor_timestamp, gh_id)
  160. )
  161. # 更新公众号的 cookie
  162. async def set_cookie_token_for_each_account(self, gh_id, cookie, token):
  163. query = """
  164. UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
  165. WHERE gh_id = %s;
  166. """
  167. return await self.pool.async_save(
  168. query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
  169. )
  170. async def set_access_token_for_each_account(self, gh_id, access_token):
  171. query = """
  172. UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
  173. """
  174. return await self.pool.async_save(
  175. query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
  176. )
  177. async def get_max_cursor_id(self, gh_id):
  178. query = """
  179. SELECT user_openid, user_create_time
  180. FROM gzh_fans_info WHERE gh_id = %s
  181. ORDER BY user_create_time DESC LIMIT 1;
  182. """
  183. return await self.pool.async_fetch(query=query, params=(gh_id,))
  184. class CrawlerGzhFans(CrawlerGzhFansBase):
  185. def __init__(self, pool, log_client):
  186. super().__init__(pool, log_client)
  187. # 抓取账号新增粉丝
  188. async def crawl_new_fans_for_each_account(self, account_info: dict):
  189. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  190. if not cookie_obj:
  191. return
  192. cursor_openid = ""
  193. cursor_timestamp = ""
  194. newest_fans = await self.get_max_cursor_id(account_info["gh_id"])
  195. newest_timestamp = newest_fans[0]["user_create_time"]
  196. while True:
  197. response = await get_gzh_fans(
  198. token=cookie_obj[0]["token"],
  199. cookie=cookie_obj[0]["cookie"],
  200. cursor_id=cursor_openid,
  201. cursor_timestamp=cursor_timestamp,
  202. )
  203. print(json.dumps(response, ensure_ascii=False, indent=4))
  204. base_resp = response.get("base_resp", {})
  205. code = base_resp.get("ret")
  206. error_msg = base_resp.get("err_msg")
  207. match code:
  208. case 0:
  209. user_list = response.get("user_list", {}).get("user_info_list")
  210. next_cursor_id = user_list[-1].get("user_openid")
  211. next_cursor_timestamp = user_list[-1].get("user_create_time")
  212. await self.insert_gzh_fans_batch(account_info, user_list)
  213. if next_cursor_timestamp <= newest_timestamp:
  214. print("新粉丝更新完成")
  215. break
  216. else:
  217. cursor_openid = next_cursor_id
  218. cursor_timestamp = next_cursor_timestamp
  219. case "00040":
  220. print(f"token 非法: {error_msg}")
  221. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  222. await feishu_robot.bot(
  223. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  224. detail=account_info,
  225. env="cookie_monitor",
  226. mention=False,
  227. )
  228. break
  229. case _:
  230. print("token 异常, 请及时刷新")
  231. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  232. await feishu_robot.bot(
  233. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  234. detail=account_info,
  235. env="cookie_monitor",
  236. mention=False,
  237. )
  238. break
  239. # 抓取单个账号存量的粉丝
  240. async def crawl_history_fans_for_each_account(self, account_info: dict):
  241. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  242. if not cookie_obj:
  243. return
  244. if not account_info.get("cursor_openid"):
  245. cursor_openid = ""
  246. else:
  247. cursor_openid = account_info["cursor_openid"]
  248. if not account_info.get("cursor_timestamp"):
  249. cursor_timestamp = ""
  250. else:
  251. cursor_timestamp = account_info["cursor_timestamp"]
  252. response = await get_gzh_fans(
  253. token=cookie_obj[0]["token"],
  254. cookie=cookie_obj[0]["cookie"],
  255. cursor_id=cursor_openid,
  256. cursor_timestamp=cursor_timestamp,
  257. )
  258. base_resp = response.get("base_resp", {})
  259. code = base_resp.get("ret")
  260. error_msg = base_resp.get("err_msg")
  261. match code:
  262. case 0:
  263. user_list = response.get("user_list", {}).get("user_info_list")
  264. if not user_list:
  265. await feishu_robot.bot(
  266. title=f"{account_info['account_name']}的粉丝已经抓取完毕,请检查",
  267. detail=account_info,
  268. env="cookie_monitor",
  269. mention=False,
  270. )
  271. await self.update_account_crawl_history_status(account_info["gh_id"], self.INVALID_STATUS)
  272. next_cursor_id = user_list[-1].get("user_openid")
  273. next_cursor_timestamp = user_list[-1].get("user_create_time")
  274. await self.insert_gzh_fans_batch(account_info, user_list)
  275. await self.update_gzh_cursor_info(
  276. account_info["gh_id"], next_cursor_id, next_cursor_timestamp
  277. )
  278. case "00040":
  279. print(f"token 非法: {error_msg}")
  280. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  281. await feishu_robot.bot(
  282. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  283. detail=account_info,
  284. env="cookie_monitor",
  285. mention=False,
  286. )
  287. case _:
  288. print("token 异常, 请及时刷新")
  289. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  290. await feishu_robot.bot(
  291. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  292. detail=account_info,
  293. env="cookie_monitor",
  294. mention=False,
  295. )
  296. # 通过 access_token && open_id 抓取 union_id
  297. async def get_union_ids_for_each_account(self, account_info: dict):
  298. # 通过 access_token 获取 union_id
  299. user_list = await self.get_open_id_list_from_database(
  300. gh_id=account_info["gh_id"]
  301. )
  302. if not user_list:
  303. return
  304. access_token_info = await self.get_access_token_from_database(
  305. account_info["gh_id"]
  306. )
  307. if not access_token_info:
  308. print(f"{account_info['account_name']}: access_token is not available")
  309. response = await get_access_token(
  310. account_info["app_id"], account_info["app_secret"]
  311. )
  312. access_token = response.get("access_token")
  313. await self.set_access_token_for_each_account(
  314. account_info["gh_id"], access_token
  315. )
  316. return
  317. access_token = access_token_info[0]["access_token"]
  318. union_info = await get_union_id_batch(
  319. access_token=access_token, user_list=user_list
  320. )
  321. if union_info.get("errcode"):
  322. await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
  323. return
  324. # 将查询到的 union_id存储到数据库中
  325. user_info_list = union_info.get("user_info_list") or []
  326. if not user_info_list:
  327. return
  328. semaphore = asyncio.Semaphore(10)
  329. tasks = [
  330. self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
  331. for user_info in user_info_list
  332. ]
  333. await asyncio.gather(*tasks, return_exceptions=True)
  334. # main function
  335. async def deal(self, task_name):
  336. account_list = await self.get_account_list_from_database()
  337. match task_name:
  338. case "get_history_fans":
  339. crawl_history_accounts = [i for i in account_list if i['crawl_history_status'] == self.AVAILABLE_STATUS]
  340. return await run_tasks_with_asyncio_task_group(
  341. task_list=crawl_history_accounts,
  342. handler=self.crawl_history_fans_for_each_account,
  343. max_concurrency=self.MAX_CONCURRENCY,
  344. fail_fast=False,
  345. description="抓取公众号账号粉丝",
  346. unit="page",
  347. )
  348. case "get_union_ids":
  349. return await run_tasks_with_asyncio_task_group(
  350. task_list=account_list,
  351. handler=self.get_union_ids_for_each_account,
  352. max_concurrency=self.MAX_CONCURRENCY,
  353. fail_fast=False,
  354. description="获取粉丝 union_id",
  355. unit="per100",
  356. )
  357. case "get_new_fans":
  358. for account in account_list:
  359. print(f"处理: {account['account_name']}")
  360. await self.crawl_new_fans_for_each_account(account)
  361. # return await run_tasks_with_asyncio_task_group()
  362. return {}
  363. case _:
  364. return {"err_msg": "invalid task_name"}