shipinhao_author_test.py 11 KB


  1. import json
  2. import os
  3. import sys
  4. import time
  5. import uuid
  6. import requests
  7. sys.path.append(os.getcwd())
  8. from common.pipeline import PiaoQuanPipelineTest
  9. from common.mq import MQ
  10. from common.db import MysqlHelper
  11. def find_target_user(name, user_list):
  12. for obj in user_list:
  13. if obj["nickname"] == name:
  14. return obj
  15. else:
  16. continue
  17. return False
  18. class ShiPinHaoAccount:
  19. def __init__(self, platform, mode, rule_dict, user_dict, env):
  20. self.cookie = None
  21. self.token = None
  22. self.account_name = user_dict["link"]
  23. self.platform = platform
  24. self.mode = mode
  25. self.rule_dict = rule_dict
  26. self.user_dict = user_dict
  27. self.env = env
  28. self.download_cnt = 0
  29. self.token_count = 0
  30. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  31. def get_token_from_mysql(self):
  32. # select_sql = f"""SELECT config from crawler_config where source = '{ self.platform }'; """
  33. # # print(select_sql)
  34. # configs = MysqlHelper.get_values(
  35. # log_type=self.mode,
  36. # crawler=self.platform,
  37. # sql=select_sql,
  38. # env=self.env,
  39. # machine="",
  40. # )
  41. # print(configs)
  42. # token_config = configs[0][0]
  43. # token_info = json.loads(token_config)
  44. # self.token = token_info["token"]
  45. # self.cookie = token_info["cookie"]
  46. self.token = "766484754"
  47. self.cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3524986952|1|fgp|0; uuid=a76c16bf749aaf6418aa610ad5c6e66c; rand_info=CAESIDhWIfyhucI9xQkQm/2xYzaHtaGjRUbHeNKgSt4b382C; slave_bizuin=3930572231; data_bizuin=3930572231; bizuin=3930572231; data_ticket=k3o3TmbxDq450TMRpBL2zW+f1onbHFg7G4/9iLi/jlp1zyWQtmpjxFouT+/kRE1e; slave_sid=TndTREg5TW9MaFUxRllkaVFacXh6bVhFSEhpSEVRNUc2RWtBbnJRZmdxZzNxaUpOc29oRGJ1RjhFZm9jNXZ3Q1JzUzN3elFDYlVjZTEyN1YyWm9nOGhsUW9sNTFEUEtDRmo1Z0hzZjA1ZjhibXg0YzVrOE91N3ZOZWVqT3UxT0FSN3lsNG9SNTNNdEE2VWNC; slave_user=gh_deef7ad59a83; xid=9bd5b038d83164cbfa24bcf224bc9172; _clsk=bqf6jh|1699929305392|6|1|mp.weixin.qq.com/weheat-agent/payload/record"
  48. print(self.token)
  49. print(self.cookie)
  50. def get_history_id(self):
  51. """
  52. 从数据库表中读取 id
  53. """
  54. select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
  55. name_id = MysqlHelper.get_values(
  56. log_type=self.mode,
  57. crawler=self.platform,
  58. sql=select_user_sql,
  59. env=self.env,
  60. machine="",
  61. )
  62. print(name_id)
  63. if name_id:
  64. return name_id[0]
  65. else:
  66. return False
  67. def get_account_id(self):
  68. # 读历史数据,如果存在 id,则直接返回 id
  69. history_id = self.get_history_id()
  70. if history_id:
  71. return history_id
  72. else:
  73. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  74. params = {
  75. "action": "search",
  76. "scene": "1",
  77. "buffer": "",
  78. "query": self.account_name,
  79. "count": "21",
  80. "token": self.token,
  81. "lang": "zh_CN",
  82. "f": "json",
  83. "ajax": "1",
  84. }
  85. headers = {
  86. "authority": "mp.weixin.qq.com",
  87. "accept": "*/*",
  88. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  89. "cookie": self.cookie,
  90. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  91. self.token
  92. ),
  93. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  94. "x-requested-with": "XMLHttpRequest",
  95. }
  96. response = requests.request("GET", url, headers=headers, params=params)
  97. self.token_count += 1
  98. user_list = response.json()
  99. print(user_list)
  100. user_list = user_list["acct_list"]
  101. target_user = find_target_user(name=self.account_name, user_list=user_list)
  102. # 写入 MySql 数据库
  103. if target_user:
  104. update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
  105. # print(update_sql)
  106. MysqlHelper.update_values(
  107. log_type=self.mode,
  108. crawler=self.platform,
  109. sql=update_sql,
  110. env=self.env,
  111. machine="",
  112. )
  113. return target_user["username"]
  114. else:
  115. return False
  116. def get_account_videos(self):
  117. # 一个账号最多抓 30 条数据
  118. self.get_token_from_mysql()
  119. user_id = self.get_account_id()
  120. print("ljh", user_id)
  121. print(type(user_id))
  122. if user_id:
  123. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  124. headers = {
  125. "authority": "mp.weixin.qq.com",
  126. "accept": "*/*",
  127. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  128. "cookie": self.cookie,
  129. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  130. self.token
  131. ),
  132. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  133. "x-requested-with": "XMLHttpRequest",
  134. }
  135. buffer = "" # 翻页指示器
  136. while True:
  137. if self.download_cnt >= 30:
  138. return
  139. params = {
  140. "action": "get_feed_list",
  141. "username": user_id,
  142. "buffer": buffer,
  143. "count": "15",
  144. "scene": "1",
  145. "token": self.token,
  146. # "token": "123456",
  147. "lang": "zh_CN",
  148. "f": "json",
  149. "ajax": "1",
  150. }
  151. response = requests.request("GET", url, headers=headers, params=params)
  152. self.token_count += 1
  153. res_json = response.json()
  154. # 开始判断视频是否有信息,是否频控
  155. if res_json["base_resp"]["err_msg"] == "invalid session":
  156. print(
  157. f"status_code:{response.status_code}, get_videoList:{response.text}\n"
  158. )
  159. time.sleep(60 * 15)
  160. continue
  161. if res_json["base_resp"]["err_msg"] == "freq control":
  162. print(
  163. f"status_code:{response.status_code}, get_videoList:{response.text}\n"
  164. )
  165. time.sleep(60 * 15)
  166. continue
  167. if not res_json.get("list"):
  168. print("没有更多视频了")
  169. return
  170. else:
  171. buffer = res_json["last_buff"]
  172. for obj in res_json["list"]:
  173. print("扫描到一条视频", self.token_count)
  174. # repeat_flag = self.process_video_obj(obj)
  175. # if not repeat_flag:
  176. # return
  177. try:
  178. print("扫描到一条视频")
  179. repeat_flag = self.process_video_obj(obj)
  180. if not repeat_flag:
  181. return
  182. except Exception as e:
  183. print(f"抓取单条视频异常:{e}\n")
  184. else:
  185. print("{}\t获取 id 失败".format(self.account_name))
  186. def process_video_obj(self, video_obj):
  187. trace_id = self.platform + str(uuid.uuid1())
  188. # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
  189. video_dict = {
  190. "video_id": video_obj["nonce_id"],
  191. "out_video_id": video_obj["nonce_id"],
  192. "video_title": video_obj["desc"],
  193. "publish_time_stamp": int(time.time()),
  194. "publish_time_str": time.strftime(
  195. "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
  196. ),
  197. "play_cnt": 0,
  198. "comment_cnt": 0,
  199. "like_cnt": 0,
  200. "share_cnt": 0,
  201. "user_id": self.user_dict["user_id"],
  202. "cover_url": video_obj["media"][0]["cover_url"],
  203. "video_url": video_obj["media"][0]["url"],
  204. "avatar_url": video_obj["head_url"],
  205. "width": video_obj["media"][0]["width"],
  206. "height": video_obj["media"][0]["height"],
  207. "duration": video_obj["media"][0]["video_play_len_s"],
  208. "platform": self.platform,
  209. "strategy": self.mode,
  210. "crawler_rule": self.rule_dict,
  211. "session": f"shipinhao-author-{int(time.time())}",
  212. }
  213. # 无更新时间,去重即可
  214. pipeline = PiaoQuanPipelineTest(
  215. platform=self.platform,
  216. mode=self.mode,
  217. item=video_dict,
  218. rule_dict=self.rule_dict,
  219. env=self.env,
  220. trace_id=trace_id,
  221. )
  222. if not pipeline.repeat_video():
  223. return False
  224. else:
  225. video_dict["out_user_id"] = video_dict["user_id"]
  226. video_dict["user_id"] = self.user_dict["uid"]
  227. video_dict["publish_time"] = video_dict["publish_time_str"]
  228. print(video_dict)
  229. print("成功发送 MQ 至 ETL")
  230. self.mq.send_msg(video_dict)
  231. self.download_cnt += 1
  232. return True
  233. if __name__ == "__main__":
  234. # temp_token = "2080949641"
  235. # temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record"
  236. SP = ShiPinHaoAccount(
  237. platform="shipinhao",
  238. mode="author",
  239. user_dict={"uid": "123456", "link": "树树读书1014", "user_id": "1234565"},
  240. rule_dict={},
  241. env="dev",
  242. )
  243. SP.get_account_videos()