crawler_gzh_fans.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import json
  2. from applications.crawler.wechat import (
  3. get_gzh_fans,
  4. get_access_token,
  5. get_union_id_batch,
  6. )
  7. from applications.utils import run_tasks_with_asyncio_task_group
  8. class CrawlerGzhFansBase:
  9. def __init__(self, pool, log_client):
  10. self.pool = pool
  11. self.log_client = log_client
  12. # 从数据库获取 access_token
  13. async def get_access_token_from_database(self, gh_id):
  14. query = """
  15. SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
  16. """
  17. return await self.pool.async_fetch(query=query, params=(gh_id, 1))
  18. # 从数据库获取粉丝 && token
  19. async def get_cookie_token_from_database(self, gh_id):
  20. query = """
  21. SELECT token, cookie FROM gzh_cookie_info WHERE gh_id = %s and token_status = %s;
  22. """
  23. return await self.pool.async_fetch(query=query, params=(gh_id, 1))
  24. # 设置access_token状态为无效
  25. async def set_access_token_as_invalid(self, gh_id):
  26. query = """
  27. UPDATE gzh_cookie_info SET access_token_status = %s WHERE gh_id = %s;
  28. """
  29. return await self.pool.async_save(query=query, params=(0, gh_id))
  30. # 设置 cookie 状态为无效
  31. async def set_cookie_token_as_invalid(self, gh_id):
  32. query = """
  33. UPDATE gzh_cookie_info SET token_status = %s WHERE gh_id = %s;
  34. """
  35. return await self.pool.async_save(query=query, params=(0, gh_id))
  36. # 获取账号列表
  37. async def get_account_list_from_database(self):
  38. query = """
  39. SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp
  40. FROM gzh_account_info WHERE status = %s;
  41. """
  42. return await self.pool.async_fetch(query=query, params=(1,))
  43. # 获取 open_id 列表
  44. async def get_open_id_list_from_database(self, gh_id):
  45. query = """
  46. SELECT user_openid as openid, 'zh_CN' as lang FROM gzh_fans_info
  47. WHERE status = %s and gh_id = %s LIMIT %s;
  48. """
  49. return await self.pool.async_fetch(query=query, params=(0, gh_id, 20))
  50. # 批量插入粉丝信息
  51. async def insert_gzh_fans_batch(self, account_info, user_list):
  52. for user in user_list:
  53. query = """
  54. INSERT IGNORE INTO gzh_fans_info
  55. (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
  56. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  57. """
  58. params = (
  59. account_info["gh_id"],
  60. account_info["account_name"],
  61. user["user_openid"],
  62. user["user_name"],
  63. user["user_create_time"],
  64. user["user_head_img"],
  65. user["user_remark"],
  66. user["identity_type"],
  67. user["identity_open_id"],
  68. )
  69. await self.pool.async_save(query=query, params=params)
  70. # 更新公众号的 cursor 位置
  71. async def update_gzh_cursor_info(self, gh_id, cursor_id, cursor_timestamp):
  72. query = """
  73. UPDATE gzh_account_info SET cursor_openid = %s, cursor_timestamp = %s WHERE gh_id = %s;
  74. """
  75. return await self.pool.async_save(
  76. query=query, params=(cursor_id, cursor_timestamp, gh_id)
  77. )
  78. # 更新公众号的 cookie
  79. async def set_cookie_for_each_account(self, gh_id, cookie, token):
  80. query = """
  81. UPDATE gzh_cookie_info SET cookie = %s, token = %s, token_status = %s
  82. WHERE gh_id = %s;
  83. """
  84. return await self.pool.async_save(query=query, params=(cookie, token, 1, gh_id))
  85. async def set_access_token_for_each_account(self, gh_id, access_token):
  86. query = """
  87. UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
  88. """
  89. return await self.pool.async_save(query=query, params=(access_token, 1, gh_id))
  90. class CrawlerGzhFans(CrawlerGzhFansBase):
  91. def __init__(self, pool, log_client):
  92. super().__init__(pool, log_client)
  93. # 抓取单个账号的粉丝
  94. async def crawl_fans_for_each_account(self, account_info):
  95. cookie_obj = await self.get_cookie_token_from_database(account_info["gh_id"])
  96. if not account_info.get("cursor_openid"):
  97. cursor_openid = ''
  98. else:
  99. cursor_openid = account_info["cursor_openid"]
  100. if not account_info.get("cursor_timestamp"):
  101. cursor_timestamp = ''
  102. else:
  103. cursor_timestamp = account_info["cursor_timestamp"]
  104. response = await get_gzh_fans(
  105. token=cookie_obj[0]["token"],
  106. cookie=cookie_obj[0]["cookie"],
  107. cursor_id=cursor_openid,
  108. cursor_timestamp=cursor_timestamp,
  109. )
  110. base_resp = response.get("base_resp", {})
  111. print(json.dumps(base_resp, indent=4, ensure_ascii=False))
  112. code = base_resp.get("ret")
  113. error_msg = base_resp.get("err_msg")
  114. match code:
  115. case 0:
  116. user_list = response.get("user_list", {}).get("user_info_list")
  117. next_cursor_id = user_list[-1].get("user_openid")
  118. next_cursor_timestamp = user_list[-1].get("user_create_time")
  119. print(json.dumps(user_list, ensure_ascii=False, indent=4))
  120. await self.insert_gzh_fans_batch(account_info, user_list)
  121. await self.update_gzh_cursor_info(
  122. account_info["gh_id"], next_cursor_id, next_cursor_timestamp
  123. )
  124. case _:
  125. print(code, error_msg)
  126. # 通过 access_token && open_id 抓取 union_id
  127. async def get_union_ids_for_each_account(self, account_info: dict):
  128. access_token = await self.get_access_token_from_database(account_info["gh_id"])
  129. if not access_token:
  130. print(f"{account_info['account_name']}: access_token is not available")
  131. response = await get_access_token(account_info['app_id'], account_info["app_secret"])
  132. print(json.dumps(response, indent=4, ensure_ascii=False))
  133. # access_token = response.get("access_token")
  134. # await self.set_access_token_for_each_account(account_info["gh_id"], access_token)
  135. #
  136. # # 通过 access_token 获取 union_id
  137. # user_list = await self.get_open_id_list_from_database(gh_id=account_info["gh_id"])
  138. # union_info = await get_union_id_batch(access_token=access_token ,user_list=user_list)
  139. # print(json.dumps(union_info, indent=4, ensure_ascii=False))
  140. # main function
  141. async def deal(self):
  142. account_list = await self.get_account_list_from_database()
  143. # for account_info in account_list:
  144. # # await self.get_union_ids_for_each_account(account_info)
  145. # await self.crawl_fans_for_each_account(account_info)
  146. return await run_tasks_with_asyncio_task_group(
  147. task_list=account_list,
  148. handler=self.crawl_fans_for_each_account,
  149. max_concurrency=5,
  150. fail_fast=False,
  151. description="抓取公众号账号粉丝",
  152. unit="page",
  153. )