ganggangdouchuan_recommend2.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. # -*- coding: utf-8 -*-
  2. # @Author: luojunhui
  3. # @Time: 2023/10/10
  4. import json
  5. import os
  6. import random
  7. import sys
  8. import time
  9. import uuid
  10. import requests
  11. from Crypto.Cipher import AES
  12. from Crypto.Hash import MD5
  13. from Crypto.Util.Padding import pad, unpad
  14. from base64 import b64encode, b64decode
  15. from common.mq import MQ
  16. sys.path.append(os.getcwd())
  17. from common.common import Common
  18. from common.aliyun_log import AliyunLogger
  19. from common.pipeline import PiaoQuanPipeline
  20. from common.public import clean_title
  21. def decrypt(a, e, n):
  22. e = MD5.new(e.encode()).hexdigest()
  23. key = e[16:].encode()
  24. iv = e[:16].encode()
  25. cipher = AES.new(key, AES.MODE_CBC, iv)
  26. if n:
  27. encrypted_data = b64decode(a)
  28. # print(encrypted_data)
  29. decrypted_data = unpad(cipher.decrypt(encrypted_data), AES.block_size)
  30. return decrypted_data.decode()
  31. else:
  32. padded_data = pad(a.encode(), AES.block_size)
  33. encrypted_data = cipher.encrypt(padded_data)
  34. return b64encode(encrypted_data).decode()
  35. def find_tencent_url(tx_vid):
  36. headers = {
  37. "Host": "h5vv.video.qq.com",
  38. "xweb_xhr": "1",
  39. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
  40. "Content-Type": "application/x-www-form-urlencoded",
  41. "Accept": "*/*",
  42. "Sec-Fetch-Site": "cross-site",
  43. "Sec-Fetch-Mode": "cors",
  44. "Sec-Fetch-Dest": "empty",
  45. "Referer": "https://servicewechat.com/wx5fcd817f3f80aece/3/page-frame.html",
  46. "Accept-Language": "en",
  47. }
  48. video_id = tx_vid
  49. url = "https://h5vv.video.qq.com/getinfo?vid={}&platform=101001&charge=0&otype=json&defn=shd".format(
  50. video_id
  51. )
  52. response = requests.get(url, headers=headers)
  53. result = json.loads(response.text.replace("QZOutputJson=", "")[:-1])
  54. vl = result["vl"]["vi"][0]
  55. key = vl["fvkey"]
  56. name = vl["fn"]
  57. folder = vl["ul"]["ui"][0]["url"]
  58. url = folder + name + "?vkey=" + key
  59. return url
  60. class GGDCScheduling:
  61. def __init__(self, log_type, crawler, rule_dict, env, our_uid):
  62. self.platform = "ganggangdouchuan"
  63. self.log_type = log_type
  64. self.crawler = crawler
  65. self.rule_dict = rule_dict
  66. self.env = env
  67. self.our_uid = our_uid
  68. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  69. self.download_count = 0
  70. # 获取视频id_list
  71. def get_videoList(self, page_id):
  72. time.sleep(random.randint(5, 10))
  73. url = "https://ganggangdouchuan2.mengniu99.com/api/getcatevideos"
  74. params = {
  75. "cateid": "video",
  76. "page": page_id,
  77. "timeline": 0,
  78. "version": "9.0.2",
  79. }
  80. headers = {
  81. "Host": "ganggangdouchuan2.mengniu99.com",
  82. "xweb_xhr": "1",
  83. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
  84. "Content-Type": "application/json",
  85. "Accept": "*/*",
  86. "Sec-Fetch-Site": "cross-site",
  87. "Sec-Fetch-Mode": "cors",
  88. "Sec-Fetch-Dest": "empty",
  89. "Referer": "https://servicewechat.com/wx5fcd817f3f80aece/3/page-frame.html",
  90. "Accept-Language": "en",
  91. }
  92. while True:
  93. try:
  94. response = requests.get(url, headers=headers, params=params)
  95. decrypted_data = decrypt(
  96. response.json()["data"][:-2], response.json()["_yyy"], True
  97. )
  98. result = json.loads(decrypted_data)
  99. AliyunLogger.logging(
  100. code="1000",
  101. platform=self.crawler,
  102. mode=self.log_type,
  103. env=self.env,
  104. data={},
  105. message="开始抓取第{}页".format(page_id),
  106. )
  107. break
  108. except:
  109. AliyunLogger.logging(
  110. code="2000",
  111. platform=self.crawler,
  112. mode=self.log_type,
  113. env=self.env,
  114. data={},
  115. message="抓取第{}页,未获取数据,编码错误".format(page_id),
  116. )
  117. Common.logger(self.log_type, self.crawler).info("编码不对,解密失败\n")
  118. return
  119. if "totalCount" not in result:
  120. Common.logger(self.log_type, self.crawler).info(
  121. f"get_videoList:{response.text}\n"
  122. )
  123. AliyunLogger.logging(
  124. code="2000",
  125. platform=self.crawler,
  126. mode=self.log_type,
  127. env=self.env,
  128. data={},
  129. message="抓取第{}页,未获取数据".format(page_id),
  130. )
  131. return
  132. elif len(result["videos"]) == 0:
  133. Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
  134. AliyunLogger.logging(
  135. code="2000",
  136. platform=self.crawler,
  137. mode=self.log_type,
  138. env=self.env,
  139. data={},
  140. message="抓取第{}页,没有更多数据啦".format(page_id),
  141. )
  142. return
  143. else:
  144. data_list = result["videos"]
  145. for index, video_obj in enumerate(data_list):
  146. try:
  147. AliyunLogger.logging(
  148. code="1001",
  149. platform=self.crawler,
  150. mode=self.log_type,
  151. env=self.env,
  152. data={},
  153. message="成功扫描到一条视频, 该视频位于第{}页{}条".format(page_id, index + 1),
  154. )
  155. self.process_video_obj(video_obj)
  156. except Exception as e:
  157. Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
  158. AliyunLogger.logging(
  159. code="3000",
  160. platform=self.crawler,
  161. mode=self.log_type,
  162. env=self.env,
  163. data=video_obj,
  164. message="抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format(
  165. e, page_id, index + 1
  166. ),
  167. )
  168. AliyunLogger.logging(
  169. code="1000",
  170. platform=self.crawler,
  171. mode=self.log_type,
  172. env=self.env,
  173. data={},
  174. message="完成抓取第{}页".format(page_id),
  175. )
  176. def process_video_obj(self, video_obj):
  177. trace_id = self.platform + str(uuid.uuid1())
  178. video_id = video_obj.get("videoid", 0)
  179. video_title = clean_title(video_obj.get("title", "no title"))
  180. video_time = video_obj.get("v_time", 0)
  181. publish_time_stamp = int(time.time())
  182. publish_time_str = time.strftime(
  183. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  184. )
  185. user_name = video_obj["nickname"]
  186. video_dict = {
  187. "video_title": video_title,
  188. "video_id": video_id,
  189. "duration": video_time,
  190. "play_cnt": 0,
  191. "like_cnt": 0,
  192. "comment_cnt": 0,
  193. "share_cnt": 0,
  194. "user_name": user_name,
  195. "publish_time_stamp": publish_time_stamp,
  196. "publish_time_str": publish_time_str,
  197. "update_time_stamp": int(time.time()),
  198. "video_width": 0,
  199. "video_height": 0,
  200. "profile_id": 0,
  201. "profile_mid": 0,
  202. "cover_url": video_obj["cover"],
  203. "session": f"ganggangdouchuan-{int(time.time())}",
  204. }
  205. video_dict["out_video_id"] = str(video_dict["video_id"])
  206. rule_pipeline = PiaoQuanPipeline(
  207. platform=self.platform,
  208. mode=self.log_type,
  209. rule_dict=self.rule_dict,
  210. env=self.env,
  211. item=video_dict,
  212. trace_id=trace_id
  213. )
  214. flag = rule_pipeline.process_item()
  215. if flag:
  216. video_dict["out_user_id"] = video_dict["profile_id"]
  217. video_dict["platform"] = self.crawler
  218. video_dict["strategy"] = self.log_type
  219. video_dict["width"] = video_dict["video_width"]
  220. video_dict["height"] = video_dict["video_height"]
  221. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  222. video_dict["user_id"] = self.our_uid
  223. video_dict["publish_time"] = video_dict["publish_time_str"]
  224. video_dict["video_url"] = find_tencent_url(video_obj["txvid"])
  225. video_dict["avatar_url"] = video_obj["avatarurl"]
  226. video_dict["cover_url"] = video_obj["cover"]
  227. self.download_count += 1
  228. self.mq.send_msg(video_dict)
  229. AliyunLogger.logging(
  230. code="1002",
  231. platform=self.crawler,
  232. mode=self.log_type,
  233. env=self.env,
  234. data=video_dict,
  235. trace_id=trace_id,
  236. message="成功发送 MQ 至 ETL",
  237. )
  238. if __name__ == "__main__":
  239. ZL = GGDCScheduling(
  240. log_type="recommend",
  241. crawler="ganggangdouchuan",
  242. rule_dict={},
  243. our_uid="luojunhuihaoshuai",
  244. env="prod",
  245. )
  246. for i in range(1):
  247. ZL.get_videoList(page_id=i + 1)
  248. print(ZL.download_count)