crawler_gzh_fans.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. import asyncio
  2. from applications.crawler.wechat import (
  3. get_gzh_fans,
  4. get_access_token,
  5. get_union_id_batch,
  6. )
  7. from applications.api import feishu_robot
  8. from applications.utils import run_tasks_with_asyncio_task_group
  9. class CrawlerGzhFansConst:
  10. INIT_STATUS = 0
  11. PROCESSING_STATUS = 1
  12. FINISHED_STATUS = 2
  13. FAILED_STATUS = 99
  14. AVAILABLE_STATUS = 1
  15. INVALID_STATUS = 0
  16. MAX_SIZE = 100
  17. MAX_CONCURRENCY = 5
  18. class CrawlerGzhFansBase(CrawlerGzhFansConst):
  19. def __init__(self, pool, log_client):
  20. self.pool = pool
  21. self.log_client = log_client
  22. # 从数据库获取 access_token
  23. async def get_access_token_from_database(self, gh_id):
  24. query = """
  25. SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
  26. """
  27. return await self.pool.async_fetch(
  28. query=query, params=(gh_id, self.AVAILABLE_STATUS)
  29. )
  30. # 从数据库获取粉丝 && token
  31. async def get_cookie_token_from_database(self, gh_id):
  32. query = """
  33. SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
  34. """
  35. return await self.pool.async_fetch(
  36. query=query, params=(gh_id, self.AVAILABLE_STATUS)
  37. )
  38. # 设置access_token状态为无效
  39. async def set_access_token_as_invalid(self, gh_id):
  40. query = """
  41. UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
  42. """
  43. return await self.pool.async_save(
  44. query=query, params=(self.INVALID_STATUS, gh_id)
  45. )
  46. # 设置 cookie 状态为无效
  47. async def set_cookie_token_as_invalid(self, gh_id):
  48. query = """
  49. UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
  50. """
  51. return await self.pool.async_save(
  52. query=query, params=(self.INVALID_STATUS, gh_id)
  53. )
  54. # 修改抓取账号状态
  55. async def update_account_crawl_history_status(self, gh_id, status):
  56. query = """
  57. UPDATE gzh_account_info SET crawl_history_status = %s WHERE gh_id = %s;
  58. """
  59. return await self.pool.async_save(query=query, params=(status, gh_id))
  60. # 获取账号列表
  61. async def get_account_list_from_database(self):
  62. query = """
  63. SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp, crawl_history_status
  64. FROM gzh_account_info WHERE status = %s;
  65. """
  66. return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
  67. # 获取 open_id 列表
  68. async def get_open_id_list_from_database(self, gh_id):
  69. query = """
  70. SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
  71. WHERE status = %s and gh_id = %s LIMIT %s;
  72. """
  73. return await self.pool.async_fetch(
  74. query=query, params=(self.INIT_STATUS, gh_id, self.MAX_SIZE)
  75. )
  76. # 批量插入粉丝信息
  77. async def insert_gzh_fans_batch(self, account_info, user_list):
  78. for user in user_list:
  79. query = """
  80. INSERT IGNORE INTO gzh_fans_info
  81. (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
  82. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  83. """
  84. params = (
  85. account_info["gh_id"],
  86. account_info["account_name"],
  87. user["user_openid"],
  88. user["user_name"],
  89. user["user_create_time"],
  90. user["user_head_img"],
  91. user["user_remark"],
  92. user["identity_type"],
  93. user["identity_open_id"],
  94. )
  95. await self.pool.async_save(query=query, params=params)
  96. # 更新抓取union_id任务的状态码
  97. async def update_task_status(self, gh_id, user_openid, ori_status, new_status):
  98. query = """
  99. UPDATE gzh_fans_info SET status = %s WHERE gh_id = %s and user_openid = %s and status = %s;
  100. """
  101. return await self.pool.async_save(
  102. query=query, params=(new_status, gh_id, user_openid, ori_status)
  103. )
  104. # 更新 union_id
  105. async def save_single_union_user(self, gh_id, user_info, semaphore):
  106. async with semaphore:
  107. openid = user_info.get("openid")
  108. if not openid:
  109. return 0
  110. try:
  111. locked = await self.update_task_status(
  112. gh_id=gh_id,
  113. user_openid=openid,
  114. ori_status=self.INIT_STATUS,
  115. new_status=self.PROCESSING_STATUS,
  116. )
  117. if not locked:
  118. return 0
  119. update_sql = """
  120. UPDATE gzh_fans_info
  121. SET union_id = %s, sex = %s, city = %s, province = %s, subscribe_scene = %s, status = %s
  122. WHERE
  123. gh_id = %s AND user_openid = %s AND status = %s;
  124. """
  125. affected_rows = await self.pool.async_save(
  126. query=update_sql,
  127. params=(
  128. user_info.get("unionid"),
  129. user_info.get("sex"),
  130. user_info.get("city"),
  131. user_info.get("province"),
  132. user_info.get("subscribe_scene"),
  133. self.FINISHED_STATUS,
  134. gh_id,
  135. openid,
  136. self.PROCESSING_STATUS,
  137. ),
  138. )
  139. return affected_rows or 0
  140. except Exception:
  141. # ❗防止任务卡死,可选:回滚状态
  142. try:
  143. await self.update_task_status(
  144. gh_id=gh_id,
  145. user_openid=openid,
  146. ori_status=self.PROCESSING_STATUS,
  147. new_status=self.INIT_STATUS,
  148. )
  149. except Exception:
  150. pass
  151. return 0
  152. # 更新公众号的 cursor 位置
  153. async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
  154. query = """
  155. UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
  156. """
  157. return await self.pool.async_save(
  158. query=query, params=(cursor_id, cursor_timestamp, gh_id)
  159. )
  160. # 更新公众号的 cookie
  161. async def set_cookie_token_for_each_account(self, gh_id, cookie, token):
  162. query = """
  163. UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
  164. WHERE gh_id = %s;
  165. """
  166. return await self.pool.async_save(
  167. query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
  168. )
  169. async def set_access_token_for_each_account(self, gh_id, access_token):
  170. query = """
  171. UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
  172. """
  173. return await self.pool.async_save(
  174. query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
  175. )
  176. class CrawlerGzhFans(CrawlerGzhFansBase):
  177. def __init__(self, pool, log_client):
  178. super().__init__(pool, log_client)
  179. # 抓取单个账号存量的粉丝
  180. async def crawl_history_fans_for_each_account(self, account_info):
  181. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  182. if not cookie_obj:
  183. return
  184. if not account_info.get("cursor_openid"):
  185. cursor_openid = ""
  186. else:
  187. cursor_openid = account_info["cursor_openid"]
  188. if not account_info.get("cursor_timestamp"):
  189. cursor_timestamp = ""
  190. else:
  191. cursor_timestamp = account_info["cursor_timestamp"]
  192. response = await get_gzh_fans(
  193. token=cookie_obj[0]["token"],
  194. cookie=cookie_obj[0]["cookie"],
  195. cursor_id=cursor_openid,
  196. cursor_timestamp=cursor_timestamp,
  197. )
  198. base_resp = response.get("base_resp", {})
  199. code = base_resp.get("ret")
  200. error_msg = base_resp.get("err_msg")
  201. match code:
  202. case 0:
  203. user_list = response.get("user_list", {}).get("user_info_list")
  204. if not user_list:
  205. await feishu_robot.bot(
  206. title=f"{account_info['account_name']}的粉丝已经抓取完毕,请检查",
  207. detail=account_info,
  208. env="cookie_monitor",
  209. mention=False,
  210. )
  211. await self.update_account_crawl_history_status(account_info["gh_id"], self.INVALID_STATUS)
  212. next_cursor_id = user_list[-1].get("user_openid")
  213. next_cursor_timestamp = user_list[-1].get("user_create_time")
  214. await self.insert_gzh_fans_batch(account_info, user_list)
  215. await self.update_gzh_cursor_info(
  216. account_info["gh_id"], next_cursor_id, next_cursor_timestamp
  217. )
  218. case "00040":
  219. print(f"token 非法: {error_msg}")
  220. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  221. await feishu_robot.bot(
  222. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  223. detail=account_info,
  224. env="cookie_monitor",
  225. mention=False,
  226. )
  227. case _:
  228. print("token 异常, 请及时刷新")
  229. await self.set_cookie_token_as_invalid(account_info["gh_id"])
  230. await feishu_robot.bot(
  231. title=f"{account_info['account_name']}的 token && cookie 失效,请及时更新",
  232. detail=account_info,
  233. env="cookie_monitor",
  234. mention=False,
  235. )
  236. # 通过 access_token && open_id 抓取 union_id
  237. async def get_union_ids_for_each_account(self, account_info: dict):
  238. # 通过 access_token 获取 union_id
  239. user_list = await self.get_open_id_list_from_database(
  240. gh_id=account_info["gh_id"]
  241. )
  242. if not user_list:
  243. return
  244. access_token_info = await self.get_access_token_from_database(
  245. account_info["gh_id"]
  246. )
  247. if not access_token_info:
  248. print(f"{account_info['account_name']}: access_token is not available")
  249. response = await get_access_token(
  250. account_info["app_id"], account_info["app_secret"]
  251. )
  252. access_token = response.get("access_token")
  253. await self.set_access_token_for_each_account(
  254. account_info["gh_id"], access_token
  255. )
  256. return
  257. access_token = access_token_info[0]["access_token"]
  258. union_info = await get_union_id_batch(
  259. access_token=access_token, user_list=user_list
  260. )
  261. if union_info.get("errcode"):
  262. await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
  263. return
  264. # 将查询到的 union_id存储到数据库中
  265. user_info_list = union_info.get("user_info_list") or []
  266. if not user_info_list:
  267. return
  268. semaphore = asyncio.Semaphore(10)
  269. tasks = [
  270. self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
  271. for user_info in user_info_list
  272. ]
  273. await asyncio.gather(*tasks, return_exceptions=True)
  274. # main function
  275. async def deal(self, task_name):
  276. account_list = await self.get_account_list_from_database()
  277. match task_name:
  278. case "get_history_fans":
  279. crawl_history_accounts = [i for i in account_list if i['crawl_history_status'] == self.AVAILABLE_STATUS]
  280. return await run_tasks_with_asyncio_task_group(
  281. task_list=crawl_history_accounts,
  282. handler=self.crawl_history_fans_for_each_account,
  283. max_concurrency=self.MAX_CONCURRENCY,
  284. fail_fast=False,
  285. description="抓取公众号账号粉丝",
  286. unit="page",
  287. )
  288. case "get_union_ids":
  289. return await run_tasks_with_asyncio_task_group(
  290. task_list=account_list,
  291. handler=self.get_union_ids_for_each_account,
  292. max_concurrency=self.MAX_CONCURRENCY,
  293. fail_fast=False,
  294. description="获取粉丝 union_id",
  295. unit="per100",
  296. )
  297. case "get_new_fans":
  298. for account in account_list:
  299. print(account)
  300. # return await run_tasks_with_asyncio_task_group()
  301. return 0
  302. case _:
  303. return {"err_msg": "invalid task_name"}