haitunzhufu_recommend3.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. # -*- coding: utf-8 -*-
  2. # @Author: luojunhui
  3. # @Time: 2023/10/18
  4. import json
  5. import os
  6. import random
  7. import sys
  8. import time
  9. import uuid
  10. from datetime import datetime
  11. import requests
  12. from base64 import b64encode, b64decode
  13. from Crypto.Cipher import AES
  14. from Crypto.Util.Padding import pad, unpad
  15. from common.mq import MQ
  16. sys.path.append(os.getcwd())
  17. from common.common import Common
  18. from common.public import clean_title
  19. from common import AliyunLogger, PiaoQuanPipeline
  20. class AESCipher:
  21. def __init__(self, key):
  22. self.key = key.encode("utf-8") # 需要一个bytes类型的key
  23. self.iv = self.key # 在这个例子中,key和iv是相同的
  24. def encrypt(self, data):
  25. cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
  26. ct_bytes = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size))
  27. ct = b64encode(ct_bytes).decode("utf-8")
  28. return ct
  29. def decrypt(self, data):
  30. try:
  31. ct = b64decode(data.encode("utf-8"))
  32. cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
  33. pt = unpad(cipher.decrypt(ct), AES.block_size)
  34. return pt.decode("utf-8")
  35. except Exception as e:
  36. print("Incorrect decryption")
  37. return None
  38. class HTZFScheduling:
  39. def __init__(self, log_type, crawler, rule_dict, env, our_uid):
  40. self.platform = "haitunzhufu"
  41. self.log_type = log_type
  42. self.crawler = crawler
  43. self.rule_dict = rule_dict
  44. self.env = env
  45. self.our_uid = our_uid
  46. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  47. self.download_count = 0
  48. # def repeat_video(self, video_id):
  49. # sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """
  50. # repeat_video = MysqlHelper.get_values(
  51. # self.log_type, self.crawler, sql, self.env
  52. # )
  53. # return len(repeat_video)
  54. # 获取视频id_list
  55. def get_videoList(self, page_id):
  56. time.sleep(random.randint(5, 10))
  57. url = "https://xinge.wyapi.cn/videos/api.videos/getItem"
  58. headers = {
  59. "xweb_xhr": "1",
  60. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
  61. "content-type": "application/json",
  62. "accept": "*/*",
  63. "sec-fetch-site": "cross-site",
  64. "sec-fetch-mode": "cors",
  65. "sec-fetch-dest": "empty",
  66. "referer": "https://servicewechat.com/wxcc35cbbc445d331a/2/page-frame.html",
  67. "accept-encoding": "gzip, deflate, br",
  68. "accept-language": "en",
  69. }
  70. params = {"mark": "", "page": page_id}
  71. response = requests.get(url, headers=headers, params=params)
  72. ori_result = response.json()
  73. AliyunLogger.logging(
  74. code="1000",
  75. platform=self.crawler,
  76. mode=self.log_type,
  77. env=self.env,
  78. message="开始抓取第{}页".format(page_id),
  79. )
  80. key = "xlc2ze7qnqg8xi1d"
  81. cipher = AESCipher(key)
  82. try:
  83. decrypted_text = cipher.decrypt(ori_result["data"])
  84. AliyunLogger.logging(
  85. code="1000",
  86. platform=self.crawler,
  87. mode=self.log_type,
  88. env=self.env,
  89. message="第{}页, 解密成功".format(page_id),
  90. )
  91. except:
  92. AliyunLogger.logging(
  93. code="2000",
  94. platform=self.crawler,
  95. mode=self.log_type,
  96. env=self.env,
  97. message="第{}页, 解密失败".format(page_id),
  98. )
  99. return
  100. result = json.loads(decrypted_text)
  101. if "list" not in result or response.status_code != 200:
  102. Common.logger(self.log_type, self.crawler).info(
  103. f"get_videoList:{response.text}\n"
  104. )
  105. AliyunLogger.logging(
  106. code="2000",
  107. platform=self.crawler,
  108. mode=self.log_type,
  109. env=self.env,
  110. message=f"get_videoList:{response.text}\n",
  111. )
  112. return
  113. elif len(result["list"]) == 0:
  114. Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
  115. # Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
  116. AliyunLogger.logging(
  117. code="2000",
  118. platform=self.crawler,
  119. mode=self.log_type,
  120. env=self.env,
  121. message=f"没有更多数据啦~\n",
  122. )
  123. return
  124. else:
  125. data_list = result["list"]
  126. for index, video_obj in enumerate(data_list):
  127. try:
  128. AliyunLogger.logging(
  129. code="1001",
  130. platform=self.crawler,
  131. mode=self.log_type,
  132. env=self.env,
  133. data={},
  134. message="成功扫描到一条视频, 该视频位于第{}页{}条".format(page_id, index + 1),
  135. )
  136. self.process_video_obj(video_obj)
  137. except Exception as e:
  138. Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
  139. # Common.logging(
  140. # self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
  141. # )
  142. AliyunLogger.logging(
  143. code="3000",
  144. platform=self.crawler,
  145. mode=self.log_type,
  146. env=self.env,
  147. data=video_obj,
  148. message="抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format(
  149. e, page_id, index + 1
  150. ),
  151. )
  152. def process_video_obj(self, video_obj):
  153. video_id = video_obj.get("id", 0)
  154. trace_id = self.crawler + str(uuid.uuid1())
  155. video_title = clean_title(video_obj.get("name", "no title"))
  156. video_time = 0
  157. publish_time_str = video_obj.get("create_at", "")
  158. # 将时间字符串转换为 datetime 对象
  159. dt = datetime.strptime(publish_time_str, "%Y-%m-%d %H:%M:%S")
  160. # 将 datetime 对象转换为时间戳
  161. publish_time_stamp = int(datetime.timestamp(dt))
  162. user_name = ""
  163. video_dict = {
  164. "video_title": video_title,
  165. "video_id": video_id,
  166. "duration": video_time,
  167. "play_cnt": int(video_obj.get("num_read", 0)),
  168. "like_cnt": int(video_obj.get("num_like", 0)),
  169. "comment_cnt": int(video_obj.get("num_comment", 0)),
  170. "share_cnt": 0,
  171. "user_name": user_name,
  172. "publish_time_stamp": publish_time_stamp,
  173. "update_time_stamp": int(time.time()),
  174. "publish_time_str": publish_time_str,
  175. "video_width": 0,
  176. "video_height": 0,
  177. "profile_id": 0,
  178. "profile_mid": 0,
  179. "session": f"haitunzhufu-{int(time.time())}",
  180. "out_user_id": video_obj.get("profile_id", 0),
  181. "platform": self.crawler,
  182. "strategy": self.log_type,
  183. }
  184. video_dict["out_video_id"] = str(video_dict["video_id"])
  185. pipeline = PiaoQuanPipeline(
  186. platform=self.crawler,
  187. mode=self.log_type,
  188. rule_dict=self.rule_dict,
  189. env=self.env,
  190. item=video_dict,
  191. trace_id=trace_id
  192. )
  193. flag = pipeline.process_item()
  194. if flag:
  195. video_dict["width"] = video_dict["video_width"]
  196. video_dict["height"] = video_dict["video_height"]
  197. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  198. video_dict["user_id"] = self.our_uid
  199. video_dict["publish_time"] = video_dict["publish_time_str"]
  200. video_dict["video_url"] = video_obj["cover"]
  201. video_dict["avatar_url"] = ""
  202. video_dict["cover_url"] = video_obj["cover"] + "&vframe/png/offset/1/w/200"
  203. self.download_count += 1
  204. self.mq.send_msg(video_dict)
  205. # print(video_dict)
  206. AliyunLogger.logging(
  207. code="1002",
  208. platform=self.crawler,
  209. mode=self.log_type,
  210. env=self.env,
  211. data=video_dict,
  212. trace_id=trace_id,
  213. message="成功发送 MQ 至 ETL"
  214. )
  215. if __name__ == "__main__":
  216. ZL = HTZFScheduling(
  217. log_type="recommend",
  218. crawler="haitunzhufu",
  219. rule_dict={},
  220. our_uid="luojunhuihaoshuai",
  221. env="dev",
  222. )
  223. for i in range(4):
  224. ZL.get_videoList(page_id=i + 1)
  225. print(ZL.download_count)