haitunzhufu_recommend3.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. # -*- coding: utf-8 -*-
  2. # @Author: luojunhui
  3. # @Time: 2023/10/18
  4. import json
  5. import os
  6. import random
  7. import sys
  8. import time
  9. from datetime import datetime
  10. import requests
  11. from base64 import b64encode, b64decode
  12. from Crypto.Cipher import AES
  13. from Crypto.Util.Padding import pad, unpad
  14. from common.mq import MQ
  15. sys.path.append(os.getcwd())
  16. from common.common import Common
  17. from common.scheduling_db import MysqlHelper
  18. from common.public import get_config_from_mysql, download_rule, download_rule_v2
  19. # 定义一个 AES 加密解密的类
  20. class AESCipher:
  21. def __init__(self, key):
  22. self.key = key.encode('utf-8') # 需要一个bytes类型的key
  23. self.iv = self.key # 在这个例子中,key和iv是相同的
  24. def encrypt(self, data):
  25. cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
  26. ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))
  27. ct = b64encode(ct_bytes).decode('utf-8')
  28. return ct
  29. def decrypt(self, data):
  30. try:
  31. ct = b64decode(data.encode('utf-8'))
  32. cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
  33. pt = unpad(cipher.decrypt(ct), AES.block_size)
  34. return pt.decode('utf-8')
  35. except Exception as e:
  36. print("Incorrect decryption")
  37. return None
  38. def clean_title(strings):
  39. return (
  40. strings.strip()
  41. .replace("\n", "")
  42. .replace("/", "")
  43. .replace("\r", "")
  44. .replace("#", "")
  45. .replace(".", "。")
  46. .replace("\\", "")
  47. .replace("&NBSP", "")
  48. .replace(":", "")
  49. .replace("*", "")
  50. .replace("?", "")
  51. .replace("?", "")
  52. .replace('"', "")
  53. .replace("<", "")
  54. .replace(">", "")
  55. .replace("|", "")
  56. .replace(" ", "")
  57. .replace('"', "")
  58. .replace("'", "")
  59. )
  60. class HTZFScheduling:
  61. def __init__(self, log_type, crawler, rule_dict, env, our_uid):
  62. self.platform = "haitunzhufu"
  63. self.log_type = log_type
  64. self.crawler = crawler
  65. self.rule_dict = rule_dict
  66. self.env = env
  67. self.our_uid = our_uid
  68. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  69. self.download_count = 0
  70. def repeat_video(self, video_id):
  71. sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """
  72. repeat_video = MysqlHelper.get_values(
  73. self.log_type, self.crawler, sql, self.env
  74. )
  75. return len(repeat_video)
  76. # 获取视频id_list
  77. def get_videoList(self, page_id):
  78. time.sleep(random.randint(5, 10))
  79. url = 'https://haitun.wyapi.cn/videos/api.videos/getItem'
  80. headers = {
  81. 'xweb_xhr': '1',
  82. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817',
  83. 'content-type': 'application/json',
  84. 'accept': '*/*',
  85. 'sec-fetch-site': 'cross-site',
  86. 'sec-fetch-mode': 'cors',
  87. 'sec-fetch-dest': 'empty',
  88. 'referer': 'https://servicewechat.com/wxcc35cbbc445d331a/2/page-frame.html',
  89. 'accept-encoding': 'gzip, deflate, br',
  90. 'accept-language': 'en'
  91. }
  92. params = {
  93. 'mark': '',
  94. 'page': page_id
  95. }
  96. response = requests.get(url, headers=headers, params=params)
  97. ori_result = response.json()
  98. key = "xlc2ze7qnqg8xi1d"
  99. cipher = AESCipher(key)
  100. decrypted_text = cipher.decrypt(ori_result['data'])
  101. result = json.loads(decrypted_text)
  102. if "list" not in result or response.status_code != 200:
  103. Common.logger(self.log_type, self.crawler).info(
  104. f"get_videoList:{response.text}\n"
  105. )
  106. Common.logging(
  107. self.log_type,
  108. self.crawler,
  109. self.env,
  110. f"get_videoList:{response.text}\n",
  111. )
  112. return
  113. elif len(result["list"]) == 0:
  114. Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
  115. Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
  116. return
  117. else:
  118. data_list = result["list"]
  119. for video_obj in data_list:
  120. try:
  121. self.process_video_obj(video_obj)
  122. except Exception as e:
  123. Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
  124. Common.logging(
  125. self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
  126. )
  127. def process_video_obj(self, video_obj):
  128. video_id = video_obj.get("id", 0)
  129. video_title = clean_title(video_obj.get("name", "no title"))
  130. video_time = 0
  131. publish_time_str = video_obj.get("create_at", "")
  132. # 将时间字符串转换为 datetime 对象
  133. dt = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S')
  134. # 将 datetime 对象转换为时间戳
  135. publish_time_stamp = int(datetime.timestamp(dt))
  136. user_name = ""
  137. video_dict = {
  138. "video_title": video_title,
  139. "video_id": video_id,
  140. "duration": video_time,
  141. "play_cnt": int(video_obj.get("num_read", 0)),
  142. "like_cnt": int(video_obj.get("num_like", 0)),
  143. "comment_cnt": int(video_obj.get("num_comment", 0)),
  144. "share_cnt": 0,
  145. "user_name": user_name,
  146. "publish_time_stamp": publish_time_stamp,
  147. "publish_time_str": publish_time_str,
  148. "video_width": 0,
  149. "video_height": 0,
  150. "profile_id": 0,
  151. "profile_mid": 0,
  152. "session": f"haitunzhufu-{int(time.time())}",
  153. }
  154. for k, v in video_dict.items():
  155. Common.logger(self.log_type, self.crawler).info(f"{k}:{v}")
  156. Common.logging(
  157. self.log_type, self.crawler, self.env, f"{video_dict}"
  158. )
  159. # 过滤无效视频
  160. if video_title == "" or video_dict["video_id"] == "":
  161. Common.logger(self.log_type, self.crawler).info("无效视频\n")
  162. Common.logging(self.log_type, self.crawler, self.env, "无效视频\n")
  163. # 抓取基础规则过滤
  164. elif (
  165. download_rule_v2(
  166. log_type=self.log_type,
  167. crawler=self.crawler,
  168. video_dict=video_dict,
  169. rule_dict=self.rule_dict,
  170. )
  171. is False
  172. ):
  173. Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n")
  174. Common.logging(
  175. self.log_type, self.crawler, self.env, "不满足抓取规则\n"
  176. )
  177. elif (
  178. any(
  179. str(word)
  180. if str(word) in video_dict["video_title"]
  181. else False
  182. for word in get_config_from_mysql(
  183. log_type=self.log_type,
  184. source=self.crawler,
  185. env=self.env,
  186. text="filter",
  187. action="",
  188. )
  189. )
  190. is True
  191. ):
  192. Common.logger(self.log_type, self.crawler).info("已中过滤词\n")
  193. Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n")
  194. elif self.repeat_video(video_dict["video_id"]) != 0:
  195. Common.logger(self.log_type, self.crawler).info("视频已下载\n")
  196. Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n")
  197. else:
  198. video_dict["out_user_id"] = video_obj.get("profile_id", 0)
  199. video_dict["platform"] = self.crawler
  200. video_dict["strategy"] = self.log_type
  201. video_dict["out_video_id"] = str(video_dict["video_id"])
  202. video_dict["width"] = video_dict["video_width"]
  203. video_dict["height"] = video_dict["video_height"]
  204. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  205. video_dict["user_id"] = self.our_uid
  206. video_dict["publish_time"] = video_dict["publish_time_str"]
  207. video_dict["video_url"] = video_obj['cover']
  208. video_dict["avatar_url"] = ""
  209. video_dict["cover_url"] = video_obj['cover'] + "&vframe/png/offset/1/w/200"
  210. # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
  211. self.download_count += 1
  212. self.mq.send_msg(video_dict)
  213. if __name__ == "__main__":
  214. ZL = HTZFScheduling(
  215. log_type="recommend",
  216. crawler="htzf",
  217. rule_dict={},
  218. our_uid="luojunhuihaoshuai",
  219. env="dev"
  220. )
  221. for i in range(4):
  222. ZL.get_videoList(page_id=i + 1)
  223. print(ZL.download_count)