shipinhao_author_test.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import json
  2. import os
  3. import sys
  4. import time
  5. import uuid
  6. import requests
  7. sys.path.append(os.getcwd())
  8. from common.pipeline import PiaoQuanPipelineTest
  9. from common.db import MysqlHelper
  10. def find_target_user(name, user_list):
  11. for obj in user_list:
  12. if obj["nickname"] == name:
  13. return obj
  14. else:
  15. continue
  16. return False
  17. class ShiPinHaoAccount:
  18. def __init__(self, platform, mode, rule_dict, user_dict, env):
  19. self.cookie = None
  20. self.token = None
  21. self.account_name = user_dict["link"]
  22. self.platform = platform
  23. self.mode = mode
  24. self.rule_dict = rule_dict
  25. self.user_dict = user_dict
  26. self.env = env
  27. self.download_cnt = 0
  28. self.token_count = 0
  29. def get_token_from_mysql(self):
  30. select_sql = f"""SELECT config from crawler_config where source = '{ self.platform }'; """
  31. # print(select_sql)
  32. configs = MysqlHelper.get_values(
  33. log_type=self.mode,
  34. crawler=self.platform,
  35. sql=select_sql,
  36. env=self.env,
  37. machine="",
  38. )
  39. token_config = configs[0][0]
  40. token_info = json.loads(token_config)
  41. self.token = token_info["token"]
  42. self.cookie = token_info["cookie"]
  43. print(self.token)
  44. print(self.cookie)
  45. def get_history_id(self):
  46. """
  47. 从数据库表中读取 id
  48. """
  49. select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
  50. name_id = MysqlHelper.get_values(
  51. log_type=self.mode,
  52. crawler=self.platform,
  53. sql=select_user_sql,
  54. env=self.env,
  55. machine="",
  56. )
  57. print(name_id[0])
  58. if name_id:
  59. return name_id[0]
  60. else:
  61. return False
  62. def get_account_id(self):
  63. # 读历史数据,如果存在 id,则直接返回 id
  64. history_id = self.get_history_id()
  65. if history_id:
  66. return history_id
  67. else:
  68. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  69. params = {
  70. "action": "search",
  71. "scene": "1",
  72. "buffer": "",
  73. "query": self.account_name,
  74. "count": "21",
  75. "token": self.token,
  76. "lang": "zh_CN",
  77. "f": "json",
  78. "ajax": "1",
  79. }
  80. headers = {
  81. "authority": "mp.weixin.qq.com",
  82. "accept": "*/*",
  83. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  84. "cookie": self.cookie,
  85. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  86. self.token
  87. ),
  88. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  89. "x-requested-with": "XMLHttpRequest",
  90. }
  91. response = requests.request("GET", url, headers=headers, params=params)
  92. self.token_count += 1
  93. user_list = response.json()
  94. print(user_list)
  95. user_list = user_list["acct_list"]
  96. target_user = find_target_user(name=self.account_name, user_list=user_list)
  97. # 写入 MySql 数据库
  98. if target_user:
  99. update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
  100. # print(update_sql)
  101. MysqlHelper.update_values(
  102. log_type=self.mode,
  103. crawler=self.platform,
  104. sql=update_sql,
  105. env=self.env,
  106. machine="",
  107. )
  108. return target_user["username"]
  109. else:
  110. return False
  111. def get_account_videos(self):
  112. # 一个账号最多抓 30 条数据
  113. self.get_token_from_mysql()
  114. user_id = self.get_account_id()
  115. print("ljh", user_id)
  116. print(type(user_id))
  117. if user_id:
  118. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  119. headers = {
  120. "authority": "mp.weixin.qq.com",
  121. "accept": "*/*",
  122. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  123. "cookie": self.cookie,
  124. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  125. self.token
  126. ),
  127. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  128. "x-requested-with": "XMLHttpRequest",
  129. }
  130. buffer = "" # 翻页指示器
  131. while True:
  132. if self.download_cnt >= 30:
  133. return
  134. params = {
  135. "action": "get_feed_list",
  136. "username": user_id,
  137. "buffer": buffer,
  138. "count": "15",
  139. "scene": "1",
  140. "token": self.token,
  141. # "token": "123456",
  142. "lang": "zh_CN",
  143. "f": "json",
  144. "ajax": "1",
  145. }
  146. response = requests.request("GET", url, headers=headers, params=params)
  147. self.token_count += 1
  148. res_json = response.json()
  149. # 开始判断视频是否有信息,是否频控
  150. if res_json["base_resp"]["err_msg"] == "invalid session":
  151. print(
  152. f"status_code:{response.status_code}, get_videoList:{response.text}\n"
  153. )
  154. time.sleep(60 * 15)
  155. continue
  156. if res_json["base_resp"]["err_msg"] == "freq control":
  157. print(
  158. f"status_code:{response.status_code}, get_videoList:{response.text}\n"
  159. )
  160. time.sleep(60 * 15)
  161. continue
  162. if not res_json.get("list"):
  163. print("没有更多视频了")
  164. return
  165. else:
  166. buffer = res_json["last_buff"]
  167. for obj in res_json["list"]:
  168. print("扫描到一条视频", self.token_count)
  169. # repeat_flag = self.process_video_obj(obj)
  170. # if not repeat_flag:
  171. # return
  172. try:
  173. print("扫描到一条视频")
  174. repeat_flag = self.process_video_obj(obj)
  175. if not repeat_flag:
  176. return
  177. except Exception as e:
  178. print(f"抓取单条视频异常:{e}\n")
  179. else:
  180. print("{}\t获取 id 失败".format(self.account_name))
  181. def process_video_obj(self, video_obj):
  182. trace_id = self.platform + str(uuid.uuid1())
  183. # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
  184. video_dict = {
  185. "video_id": video_obj["nonce_id"],
  186. "out_video_id": video_obj["nonce_id"],
  187. "video_title": video_obj["desc"],
  188. "publish_time_stamp": int(time.time()),
  189. "publish_time_str": time.strftime(
  190. "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
  191. ),
  192. "play_cnt": 0,
  193. "comment_cnt": 0,
  194. "like_cnt": 0,
  195. "share_cnt": 0,
  196. "user_id": self.user_dict["user_id"],
  197. "cover_url": video_obj["media"][0]["cover_url"],
  198. "video_url": video_obj["media"][0]["url"],
  199. "avatar_url": video_obj["head_url"],
  200. "width": video_obj["media"][0]["width"],
  201. "height": video_obj["media"][0]["height"],
  202. "duration": video_obj["media"][0]["video_play_len_s"],
  203. "platform": self.platform,
  204. "strategy": self.mode,
  205. "crawler_rule": self.rule_dict,
  206. "session": f"shipinhao-author-{int(time.time())}",
  207. }
  208. # 无更新时间,去重即可
  209. pipeline = PiaoQuanPipelineTest(
  210. platform=self.platform,
  211. mode=self.mode,
  212. item=video_dict,
  213. rule_dict=self.rule_dict,
  214. env=self.env,
  215. trace_id=trace_id,
  216. )
  217. if not pipeline.repeat_video():
  218. return False
  219. else:
  220. video_dict["out_user_id"] = video_dict["user_id"]
  221. video_dict["user_id"] = self.user_dict["uid"]
  222. video_dict["publish_time"] = video_dict["publish_time_str"]
  223. print(video_dict)
  224. print("成功发送 MQ 至 ETL")
  225. self.download_cnt += 1
  226. return True
  227. if __name__ == "__main__":
  228. # temp_token = "2080949641"
  229. # temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record"
  230. SP = ShiPinHaoAccount(
  231. platform="shipinhao",
  232. mode="author",
  233. user_dict={"uid": "123456", "link": "心煤", "user_id": "1234565"},
  234. rule_dict={},
  235. env="prod",
  236. )
  237. SP.get_account_videos()