zhuwanwufusu_2.py 16 KB


  1. import os
  2. import json
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  9. from cryptography.hazmat.backends import default_backend
  10. sys.path.append(os.getcwd())
  11. from application.items import VideoItem
  12. from application.pipeline import PiaoQuanPipeline
  13. from application.common.messageQueue import MQ
  14. from application.common.proxies import tunnel_proxies
  15. from application.common.log import AliyunLogger
  16. from application.common.mysql import MysqlHelper
  17. class AES(object):
  18. """
  19. 祝万物复苏
  20. """
  21. def __init__(self):
  22. self.key = b"50102fa64073ad76" # 用适当的方式转换或直接定义为字节串
  23. self.iv = b"173d023138824bb0" # 同上
  24. def aes_encrypt(self, data):
  25. """
  26. aes 加密
  27. :param data:
  28. :return:
  29. """
  30. cipher = Cipher(
  31. algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()
  32. )
  33. encryptor = cipher.encryptor()
  34. ct = encryptor.update(self._pad(data).encode()) + encryptor.finalize()
  35. return ct.hex().upper()
  36. def aes_decrypt(self, data):
  37. """
  38. 解密
  39. :param data:
  40. :return:
  41. """
  42. cipher = Cipher(
  43. algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()
  44. )
  45. decryptor = cipher.decryptor()
  46. decrypted_data = decryptor.update(bytes.fromhex(data)) + decryptor.finalize()
  47. return self._unpad(decrypted_data).decode()
  48. def _pad(self, s):
  49. """
  50. 填充
  51. :param s:
  52. :return:
  53. """
  54. return s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
  55. def _unpad(self, s):
  56. """
  57. 非填充
  58. :param s:
  59. :return:
  60. """
  61. return s[: -ord(s[len(s) - 1:])]
  62. class ZhuWanWuFuSuRecommend(object):
  63. """
  64. 祝好事多磨小程序爬虫
  65. """
  66. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  67. self.limit_flag = False
  68. self.platform = platform
  69. self.mode = mode
  70. self.rule_dict = rule_dict
  71. self.user_list = user_list
  72. self.env = env
  73. self.download_cnt = 0
  74. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  75. self.expire_flag = False
  76. self.cryptor = AES()
  77. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  78. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  79. def get_recommend_list(self):
  80. """
  81. 获取推荐页视频
  82. """
  83. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2"
  84. headers = {
  85. "Host": "api.lidongze.cn",
  86. "xweb_xhr": "1",
  87. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009",
  88. "token": "",
  89. "content-type": "application/json",
  90. "accept": "*/*",
  91. "referer": "https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html",
  92. "accept-language": "en-US,en;q=0.9",
  93. }
  94. page_index = 1
  95. total_page = 2
  96. while page_index <= total_page:
  97. try:
  98. query = {
  99. "pageNo": page_index,
  100. "pageSize": 10,
  101. "groupId": "1650323161797439489", # 推荐流的 ID
  102. "vn": 1,
  103. "gx": 1,
  104. "appid": "wx0afdc2669ed8df2f",
  105. "type": 0,
  106. }
  107. params = {"v": self.cryptor.aes_encrypt(data=json.dumps(query))}
  108. response = requests.get(
  109. url, headers=headers, params=params
  110. )
  111. result = json.loads(self.cryptor.aes_decrypt(response.text))
  112. total_page = result["list"]["pages"]
  113. page_index = result["list"]["current"] + 1
  114. for index, video_obj in enumerate(result["list"]["records"], 1):
  115. try:
  116. # c += 1
  117. # print(c)
  118. self.aliyun_log.logging(
  119. code="1001", message="扫描到一条视频", data=video_obj
  120. )
  121. self.process_video_obj(video_obj)
  122. # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
  123. except Exception as e:
  124. self.aliyun_log.logging(
  125. code="3000",
  126. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  127. page_index, index, e
  128. ),
  129. )
  130. except Exception as e:
  131. self.aliyun_log.logging(
  132. code="3000", message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  133. )
  134. if self.limit_flag:
  135. return
  136. time.sleep(random.randint(5, 10))
  137. def get_user_videos(self, user_id):
  138. """
  139. 在抓取完推荐页之后,去抓每一个用户的主页视频
  140. """
  141. url = "https://api.lidongze.cn/jeecg-boot/ugc/getAuthVideoList"
  142. headers = {
  143. "Host": "api.lidongze.cn",
  144. "xweb_xhr": "1",
  145. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009",
  146. "token": "",
  147. "content-type": "application/json",
  148. "accept": "*/*",
  149. "referer": "https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html",
  150. "accept-language": "en-US,en;q=0.9",
  151. }
  152. page_index = 1
  153. total_page = 1
  154. while page_index <= total_page:
  155. query = {"pageNo": page_index, "pageSize": 10, "authid": user_id}
  156. params = {"v": self.cryptor.aes_encrypt(data=json.dumps(query))}
  157. response = requests.request(
  158. "GET", url, headers=headers, params=params
  159. )
  160. result = json.loads(self.cryptor.aes_decrypt(response.text))
  161. total_page = result["list"]["pages"]
  162. page_index = result["list"]["current"] + 1
  163. for index, video_temp in enumerate(result["list"]["records"]):
  164. video_id = video_temp["id"]
  165. detail_query = {"videoId": video_id}
  166. detail_params = {
  167. "v": self.cryptor.aes_encrypt(data=json.dumps(detail_query))
  168. }
  169. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideosDataEn"
  170. headers = {
  171. "Host": "api.lidongze.cn",
  172. "xweb_xhr": "1",
  173. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009",
  174. "token": "",
  175. "content-type": "application/json",
  176. "accept": "*/*",
  177. "referer": "https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html",
  178. "accept-language": "en-US,en;q=0.9",
  179. }
  180. detail_response = requests.request(
  181. "GET",
  182. url,
  183. headers=headers,
  184. params=detail_params
  185. )
  186. detail_video = json.loads(
  187. self.cryptor.aes_decrypt(detail_response.text)
  188. )
  189. if detail_video["success"]:
  190. try:
  191. self.aliyun_log.logging(
  192. code="1001", message="扫描到一条视频", data=detail_video["data"]
  193. )
  194. self.process_video_obj(detail_video["data"])
  195. except Exception as e:
  196. self.aliyun_log.logging(
  197. code="3000",
  198. message="抓取单条视频失败, 该视频位于第{}条报错原因是{}".format(index, e),
  199. )
  200. def process_video_obj(self, video_obj):
  201. """
  202. 处理视频
  203. :param video_obj:
  204. """
  205. time.sleep(random.randint(3, 8))
  206. trace_id = self.platform + str(uuid.uuid1())
  207. if video_obj.get("playnum"):
  208. play_cnt = (
  209. int(video_obj["playnum"].replace("万+", "0000"))
  210. if "万+" in video_obj["playnum"]
  211. else int(video_obj["playnum"])
  212. )
  213. else:
  214. play_cnt = 0
  215. our_user = random.choice(self.user_list)
  216. item = VideoItem()
  217. item.add_video_info("video_id", video_obj["id"])
  218. item.add_video_info("video_title", video_obj["vname"])
  219. item.add_video_info("play_cnt", play_cnt)
  220. item.add_video_info("publish_time_stamp", int(time.time()))
  221. item.add_video_info("out_user_id", video_obj["authid"])
  222. item.add_video_info("cover_url", video_obj["shareimg"])
  223. item.add_video_info("like_cnt", int(video_obj["likenum"]))
  224. item.add_video_info("video_url", video_obj["videoaddr"])
  225. item.add_video_info("out_video_id", video_obj["id"])
  226. item.add_video_info("platform", self.platform)
  227. item.add_video_info("strategy", self.mode)
  228. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  229. item.add_video_info("user_id", our_user["uid"])
  230. item.add_video_info("user_name", our_user["nick_name"])
  231. # 把扫描到的账号存到 accounts 表中
  232. self.manage_auth_id(
  233. out_user_id=video_obj["authid"], out_user_name=video_obj["authname"]
  234. )
  235. mq_obj = item.produce_item()
  236. pipeline = PiaoQuanPipeline(
  237. platform=self.platform,
  238. mode=self.mode,
  239. rule_dict=self.rule_dict,
  240. env=self.env,
  241. item=mq_obj,
  242. trace_id=trace_id,
  243. )
  244. if pipeline.process_item():
  245. self.download_cnt += 1
  246. self.mq.send_msg(mq_obj)
  247. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  248. if self.download_cnt >= int(
  249. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  250. ):
  251. self.limit_flag = True
  252. def manage_auth_id(self, out_user_id, out_user_name):
  253. """
  254. out_user_id: 外站视频的用户 id
  255. out_user_name: 外站视频用户名字
  256. 逻辑: 对新扫描到的视频的用户 id 进行判断,若用户 id 不存在,则把视频 id 存到表中,
  257. 如果用户 id 存在,则判断用户是否修改名字,若名字修改则更新名字
  258. """
  259. select_user_sql = f"""select name, name_id from accounts where name_id = "{out_user_id}" and platform = "{self.platform}" and useful = 1 limit 1"""
  260. out_user_info = self.mysql.select(sql=select_user_sql)
  261. if out_user_info:
  262. name, name_id = out_user_info[0]
  263. if name == out_user_name:
  264. return
  265. else:
  266. update_sql = f"""update accounts set name = "{out_user_name}" where name_id = "{out_user_id}";"""
  267. self.mysql.update(sql=update_sql)
  268. else:
  269. insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{out_user_name}", "{out_user_id}", "{self.platform}", 1 )"""
  270. self.mysql.update(sql=insert_sql)
  271. def get_user_list(self):
  272. """
  273. 获取用户列表
  274. :return:
  275. """
  276. select_user_sql = f"""select name_id from accounts where platform = "{self.platform}" and useful = 1"""
  277. out_user_info = self.mysql.select(
  278. sql=select_user_sql,
  279. )
  280. if out_user_info:
  281. result = []
  282. for i in out_user_info:
  283. result.append(i[0])
  284. return result
  285. else:
  286. return []
  287. def get_detail_video_list(self):
  288. """
  289. 获取推荐列视频
  290. :return:
  291. """
  292. url = "https://api.lidongze.cn/jeecg-boot/ugc/getDetailVideoListsEn2"
  293. headers = {
  294. "Host": "api.lidongze.cn",
  295. "xweb_xhr": "1",
  296. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100",
  297. "token": "",
  298. "referer": "https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html",
  299. "accept-language": "en-US,en;q=0.9",
  300. }
  301. page_index = 1
  302. total_page = 2
  303. while page_index <= total_page:
  304. try:
  305. if self.limit_flag:
  306. self.aliyun_log.logging(code="2000", message="本轮已经抓取足够数量的视频")
  307. return
  308. else:
  309. query = {
  310. "groupId": "1650323161797439489",
  311. "pageNo": page_index,
  312. "pageSize": 10,
  313. "appid": "wx0afdc2669ed8df2f",
  314. "type": 3,
  315. "hxid": "1556555457243828666",
  316. }
  317. params = {"v": self.cryptor.aes_encrypt(data=json.dumps(query))}
  318. response = requests.request(
  319. "GET", url, headers=headers, params=params
  320. )
  321. result = json.loads(self.cryptor.aes_decrypt(response.text))
  322. total_page = result["list"]["pages"]
  323. page_index = result["list"]["current"] + 1
  324. for index, video_obj in enumerate(result["list"]["records"], 1):
  325. try:
  326. self.aliyun_log.logging(
  327. code="1001", message="扫描到一条视频", data=video_obj
  328. )
  329. self.process_video_obj(video_obj)
  330. except Exception as e:
  331. self.aliyun_log.logging(
  332. code="3000",
  333. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  334. page_index, index, e
  335. ),
  336. )
  337. except Exception as e:
  338. self.aliyun_log.logging(
  339. code="3000", message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  340. )
  341. time.sleep(random.randint(5, 10))
  342. def run(self):
  343. """
  344. 先抓取推荐列表的视频, 等待 2 分钟后抓取 detail 页面,等待 5 分钟后,抓取账号视频
  345. """
  346. self.get_recommend_list()
  347. # if self.limit_flag:
  348. # return
  349. # time.sleep(2 * 60)
  350. # self.get_detail_video_list()
  351. # if self.limit_flag:
  352. # return
  353. # time.sleep(5 * 60)
  354. # self.mode = "author"
  355. # user_list = self.get_user_list()
  356. # if user_list:
  357. # for index, user_id in enumerate(user_list):
  358. # try:
  359. # if self.limit_flag:
  360. # self.aliyun_log.logging(code="2000", message="本轮已经抓取足够数量的视频")
  361. # return
  362. # self.get_user_videos(user_id=user_id)
  363. # except Exception as e:
  364. # self.aliyun_log.logging(
  365. # code="3000",
  366. # message="抓取账号视频出现异常,账号 id 是{}, 报错原因是{}".format(user_id, e),
  367. # )
  368. # if __name__ == '__main__':
  369. # Z = ZhuWanWuFuSuRecommend(
  370. # platform="zhuwanwufusu2",
  371. # mode="recommend",
  372. # rule_dict={},
  373. # user_list=[{'uid': "123456", 'nick_name': "luojunhui"}],
  374. #
  375. # )
  376. # Z.get_recommend_list()