shipinhao_author_test.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import json
  2. import os
  3. import sys
  4. import time
  5. import uuid
  6. import requests
  7. sys.path.append(os.getcwd())
  8. from common.pipeline import PiaoQuanPipelineTest
  9. from common.mq import MQ
  10. from common.db import MysqlHelper
  11. def find_target_user(name, user_list):
  12. for obj in user_list:
  13. if obj["nickname"] == name:
  14. return obj
  15. else:
  16. continue
  17. return False
  18. class ShiPinHaoAccount:
  19. def __init__(self, platform, mode, rule_dict, user_dict, env):
  20. self.cookie = None
  21. self.token = None
  22. self.account_name = user_dict["link"]
  23. self.platform = platform
  24. self.mode = mode
  25. self.rule_dict = rule_dict
  26. self.user_dict = user_dict
  27. self.env = env
  28. self.download_cnt = 0
  29. self.token_count = 0
  30. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  31. def get_token_from_mysql(self):
  32. select_sql = f"""SELECT config from crawler_config where source = '{ self.platform }'; """
  33. # print(select_sql)
  34. configs = MysqlHelper.get_values(
  35. log_type=self.mode,
  36. crawler=self.platform,
  37. sql=select_sql,
  38. env=self.env,
  39. machine="",
  40. )
  41. token_config = configs[0][0]
  42. token_info = json.loads(token_config)
  43. self.token = token_info["token"]
  44. self.cookie = token_info["cookie"]
  45. def get_history_id(self):
  46. """
  47. 从数据库表中读取 id
  48. """
  49. select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
  50. name_id = MysqlHelper.get_values(
  51. log_type=self.mode,
  52. crawler=self.platform,
  53. sql=select_user_sql,
  54. env=self.env,
  55. machine="",
  56. )
  57. if name_id:
  58. # return False
  59. return name_id[0][0]
  60. else:
  61. return False
  62. def get_account_id(self):
  63. # 读历史数据,如果存在 id,则直接返回 id
  64. history_id = self.get_history_id()
  65. if history_id:
  66. return history_id
  67. else:
  68. self.get_token_from_mysql()
  69. print(self.token)
  70. print(self.cookie)
  71. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  72. params = {
  73. "action": "search",
  74. "scene": "1",
  75. "buffer": "",
  76. "query": self.account_name,
  77. "count": "21",
  78. "token": self.token,
  79. "lang": "zh_CN",
  80. "f": "json",
  81. "ajax": "1",
  82. }
  83. headers = {
  84. "authority": "mp.weixin.qq.com",
  85. "accept": "*/*",
  86. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  87. "cookie": self.cookie,
  88. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  89. self.token
  90. ),
  91. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  92. "x-requested-with": "XMLHttpRequest",
  93. }
  94. response = requests.request("GET", url, headers=headers, params=params)
  95. self.token_count += 1
  96. user_list = response.json()
  97. # print(user_list)
  98. user_list = user_list["acct_list"]
  99. target_user = find_target_user(name=self.account_name, user_list=user_list)
  100. # 写入 MySql 数据库
  101. if target_user:
  102. update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
  103. # print(update_sql)
  104. MysqlHelper.update_values(
  105. log_type=self.mode,
  106. crawler=self.platform,
  107. sql=update_sql,
  108. env=self.env,
  109. machine="",
  110. )
  111. return target_user["username"]
  112. else:
  113. return False
  114. def get_account_videos(self):
  115. # 一个账号最多抓 30 条数据
  116. user_id = self.get_account_id()
  117. if user_id:
  118. print(user_id)
  119. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  120. buffer = "" # 翻页指示器
  121. while True:
  122. if self.download_cnt >= 30:
  123. return
  124. self.get_token_from_mysql()
  125. params = {
  126. "action": "get_feed_list",
  127. "username": user_id,
  128. "buffer": buffer,
  129. "count": "15",
  130. "scene": "1",
  131. "token": self.token,
  132. "lang": "zh_CN",
  133. "f": "json",
  134. "ajax": "1",
  135. }
  136. headers = {
  137. "authority": "mp.weixin.qq.com",
  138. "accept": "*/*",
  139. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  140. "cookie": self.cookie,
  141. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  142. self.token
  143. ),
  144. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  145. "x-requested-with": "XMLHttpRequest",
  146. }
  147. response = requests.request("GET", url, headers=headers, params=params)
  148. self.token_count += 1
  149. res_json = response.json()
  150. # 开始判断视频是否有信息,是否频控
  151. if res_json["base_resp"]["err_msg"] == "invalid session":
  152. print(
  153. f"status_code:{response.status_code}, get_videoList:{response.text}\n"
  154. )
  155. time.sleep(60 * 15)
  156. continue
  157. if res_json["base_resp"]["err_msg"] == "freq control":
  158. print(
  159. f"status_code:{response.status_code}, get_videoList:{response.text}\n"
  160. )
  161. time.sleep(60 * 15)
  162. continue
  163. if not res_json.get("list"):
  164. print("没有更多视频了")
  165. return
  166. else:
  167. buffer = res_json["last_buff"]
  168. for obj in res_json["list"]:
  169. print("扫描到一条视频", self.token_count)
  170. try:
  171. print("扫描到一条视频")
  172. repeat_flag = self.process_video_obj(obj)
  173. if not repeat_flag:
  174. return
  175. except Exception as e:
  176. print(f"抓取单条视频异常:{e}\n")
  177. else:
  178. print("{}\t获取 id 失败".format(self.account_name))
  179. def process_video_obj(self, video_obj):
  180. trace_id = self.platform + str(uuid.uuid1())
  181. # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
  182. video_dict = {
  183. "video_id": video_obj["nonce_id"],
  184. "out_video_id": video_obj["nonce_id"],
  185. "video_title": video_obj["desc"],
  186. "publish_time_stamp": int(time.time()),
  187. "publish_time_str": time.strftime(
  188. "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
  189. ),
  190. "play_cnt": 0,
  191. "comment_cnt": 0,
  192. "like_cnt": 0,
  193. "share_cnt": 0,
  194. "user_id": self.user_dict["user_id"],
  195. "cover_url": video_obj["media"][0]["cover_url"],
  196. "video_url": video_obj["media"][0]["url"],
  197. "avatar_url": video_obj["head_url"],
  198. "width": video_obj["media"][0]["width"],
  199. "height": video_obj["media"][0]["height"],
  200. "duration": video_obj["media"][0]["video_play_len_s"],
  201. "platform": self.platform,
  202. "strategy": self.mode,
  203. "crawler_rule": self.rule_dict,
  204. "session": f"shipinhao-author-{int(time.time())}",
  205. }
  206. # 无更新时间,去重即可
  207. pipeline = PiaoQuanPipelineTest(
  208. platform=self.platform,
  209. mode=self.mode,
  210. item=video_dict,
  211. rule_dict=self.rule_dict,
  212. env=self.env,
  213. trace_id=trace_id,
  214. )
  215. if not pipeline.repeat_video():
  216. return False
  217. else:
  218. video_dict["out_user_id"] = video_dict["user_id"]
  219. video_dict["user_id"] = self.user_dict["uid"]
  220. video_dict["publish_time"] = video_dict["publish_time_str"]
  221. print(video_dict)
  222. print("成功发送 MQ 至 ETL")
  223. # self.mq.send_msg(video_dict)
  224. self.download_cnt += 1
  225. return True
  226. if __name__ == "__main__":
  227. SP = ShiPinHaoAccount(
  228. platform="shipinhao",
  229. mode="author",
  230. user_dict={"uid": "123456", "link": "树树读书1014", "user_id": "1234565"},
  231. rule_dict={},
  232. env="prod",
  233. )
  234. SP.get_account_videos()