huanhuanxixizhufudao_recommend_2.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. # -*- coding: utf-8 -*-
  2. # @Author: luojunhui
  3. # @Time: 2023/10/18
  4. import json
  5. import os
  6. import random
  7. import sys
  8. import time
  9. import requests
  10. from hashlib import md5
  11. from datetime import datetime
  12. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  13. from cryptography.hazmat.backends import default_backend
  14. from cryptography.hazmat.primitives import padding
  15. import binascii
  16. from common.mq import MQ
  17. sys.path.append(os.getcwd())
  18. from common.common import Common
  19. from common.scheduling_db import MysqlHelper
  20. from common.public import get_config_from_mysql, download_rule, download_rule_v2
  21. # 定义一个 AES 加密解密的类
  22. class AESCryptor:
  23. def __init__(self):
  24. # 初始化密钥和 IV, 在生产环境中,这些值不应该被硬编码
  25. self.key = b"50102fa64073ad76"
  26. self.iv = b"173d023138824bb0"
  27. # AES 加密方法
  28. def aes_encrypt(self, data):
  29. # 使用 PKCS7 填充模式处理待加密的数据,使其长度满足 AES 加密的需求
  30. padder = padding.PKCS7(128).padder()
  31. padded_data = padder.update(data.encode('utf-8')) + padder.finalize()
  32. # 初始化 AES 加密器,使用 CBC 模式和给定的密钥、IV
  33. backend = default_backend()
  34. cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=backend)
  35. encryptor = cipher.encryptor()
  36. ct = encryptor.update(padded_data) + encryptor.finalize()
  37. # 将加密后的字节串转为十六进制字符串,并转为大写
  38. return binascii.hexlify(ct).upper().decode('utf-8')
  39. # AES 解密方法
  40. def aes_decrypt(self, hex_data):
  41. # 将十六进制字符串转为原始的字节串
  42. ct = binascii.unhexlify(hex_data)
  43. # 初始化 AES 解密器,使用相同的密钥和 IV
  44. backend = default_backend()
  45. cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=backend)
  46. decryptor = cipher.decryptor()
  47. padded_data = decryptor.update(ct) + decryptor.finalize()
  48. # 使用 PKCS7 移除填充
  49. unpadder = padding.PKCS7(128).unpadder()
  50. data = unpadder.update(padded_data) + unpadder.finalize()
  51. # 返回解密后,去掉填充的原始字符串
  52. return data.decode('utf-8')
  53. def clean_title(strings):
  54. return (
  55. strings.strip()
  56. .replace("\n", "")
  57. .replace("/", "")
  58. .replace("\r", "")
  59. .replace("#", "")
  60. .replace(".", "。")
  61. .replace("\\", "")
  62. .replace("&NBSP", "")
  63. .replace(":", "")
  64. .replace("*", "")
  65. .replace("?", "")
  66. .replace("?", "")
  67. .replace('"', "")
  68. .replace("<", "")
  69. .replace(">", "")
  70. .replace("|", "")
  71. .replace(" ", "")
  72. .replace('"', "")
  73. .replace("'", "")
  74. )
  75. class HHXXZFDScheduling:
  76. def __init__(self, log_type, crawler, rule_dict, env, our_uid):
  77. self.platform = "欢欢喜喜祝福到"
  78. self.log_type = log_type
  79. self.crawler = crawler
  80. self.rule_dict = rule_dict
  81. self.env = env
  82. self.our_uid = our_uid
  83. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  84. self.download_count = 0
  85. def repeat_video(self, video_id):
  86. sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """
  87. repeat_video = MysqlHelper.get_values(
  88. self.log_type, self.crawler, sql, self.env
  89. )
  90. return len(repeat_video)
  91. # 获取视频id_list
  92. def get_videoList(self, page_id, page_limit):
  93. time.sleep(random.randint(5, 10))
  94. my_dict = {
  95. "pageNo": page_id, # 页数
  96. "pageSize": page_limit, # 每一页的视频数量
  97. "groupId": "1650323161797439489", # 分类
  98. "vn": 1,
  99. "gx": 1,
  100. "appid": "wx9a60184c443f39af", # 小程序id
  101. "type": 2,
  102. "hxid": "this may not be important",
  103. }
  104. my_str = AESCryptor().aes_encrypt(json.dumps(my_dict, ensure_ascii=False))
  105. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2?v={}".format(my_str)
  106. # 请求头
  107. headers = {
  108. "xweb_xhr": "1",
  109. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
  110. "content-type": "application/json",
  111. "accept": "*/*",
  112. "sec-fetch-site": "cross-site",
  113. "sec-fetch-mode": "cors",
  114. "sec-fetch-dest": "empty",
  115. "referer": "https://servicewechat.com/wx9a60184c443f39af/9/page-frame.html",
  116. "accept-encoding": "gzip, deflate, br",
  117. "accept-language": "en",
  118. }
  119. response = requests.get(url, headers=headers)
  120. result = json.loads(AESCryptor().aes_decrypt(response.text))
  121. if "list" not in result or response.status_code != 200:
  122. Common.logger(self.log_type, self.crawler).info(
  123. f"get_videoList:{response.text}\n"
  124. )
  125. Common.logging(
  126. self.log_type,
  127. self.crawler,
  128. self.env,
  129. f"get_videoList:{response.text}\n",
  130. )
  131. return
  132. elif len(result["list"]["records"]) == 0:
  133. Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
  134. Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
  135. return
  136. else:
  137. data_list = result["list"]["records"]
  138. for video_obj in data_list:
  139. try:
  140. self.process_video_obj(video_obj)
  141. except Exception as e:
  142. Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
  143. Common.logging(
  144. self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
  145. )
  146. def process_video_obj(self, video_obj):
  147. # print(type(video_obj))
  148. video_id = video_obj.get("id", 0)
  149. video_title = clean_title(video_obj.get("vname", "no title"))
  150. video_time = video_obj.get("v_time", 0)
  151. publish_time_stamp = int(time.time())
  152. publish_time_str = time.strftime(
  153. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  154. )
  155. user_name = video_obj.get("authname", "")
  156. video_dict = {
  157. "video_title": video_title,
  158. "video_id": video_id,
  159. "duration": video_time,
  160. "play_cnt": int(video_obj.get("playnum", 0).replace("万+", "0000") if "万+" in video_obj.get("playnum", 0) else video_obj.get("playnum", 0)),
  161. "like_cnt": int(video_obj.get("likenum", 0)),
  162. "comment_cnt": 0,
  163. "share_cnt": 0,
  164. "user_name": user_name,
  165. "publish_time_stamp": publish_time_stamp,
  166. "publish_time_str": publish_time_str,
  167. "video_width": 0,
  168. "video_height": 0,
  169. "profile_id": 0,
  170. "profile_mid": 0,
  171. "session": f"huanhaunxixizhufudao-{int(time.time())}",
  172. }
  173. for k, v in video_dict.items():
  174. Common.logger(self.log_type, self.crawler).info(f"{k}:{v}")
  175. Common.logging(
  176. self.log_type, self.crawler, self.env, f"{video_dict}"
  177. )
  178. # 过滤无效视频
  179. if video_title == "" or video_dict["video_id"] == "":
  180. Common.logger(self.log_type, self.crawler).info("无效视频\n")
  181. Common.logging(self.log_type, self.crawler, self.env, "无效视频\n")
  182. # 抓取基础规则过滤
  183. elif (
  184. download_rule_v2(
  185. log_type=self.log_type,
  186. crawler=self.crawler,
  187. video_dict=video_dict,
  188. rule_dict=self.rule_dict,
  189. )
  190. is False
  191. ):
  192. Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n")
  193. Common.logging(
  194. self.log_type, self.crawler, self.env, "不满足抓取规则\n"
  195. )
  196. elif (
  197. any(
  198. str(word)
  199. if str(word) in video_dict["video_title"]
  200. else False
  201. for word in get_config_from_mysql(
  202. log_type=self.log_type,
  203. source=self.crawler,
  204. env=self.env,
  205. text="filter",
  206. action="",
  207. )
  208. )
  209. is True
  210. ):
  211. Common.logger(self.log_type, self.crawler).info("已中过滤词\n")
  212. Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n")
  213. elif self.repeat_video(video_dict["video_id"]) != 0:
  214. Common.logger(self.log_type, self.crawler).info("视频已下载\n")
  215. Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n")
  216. else:
  217. # out_video_id = md5(video_title.encode('utf8')).hexdigest()
  218. # out_user_id = md5(user_name.encode('utf8')).hexdigest()
  219. video_dict["out_user_id"] = video_obj.get("authid", 0)
  220. video_dict["platform"] = self.crawler
  221. video_dict["strategy"] = self.log_type
  222. video_dict["out_video_id"] = str(video_dict["video_id"])
  223. video_dict["width"] = video_dict["video_width"]
  224. video_dict["height"] = video_dict["video_height"]
  225. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  226. video_dict["user_id"] = self.our_uid
  227. video_dict["publish_time"] = video_dict["publish_time_str"]
  228. video_dict["video_url"] = video_obj['videoaddr']
  229. video_dict["avatar_url"] = video_obj['authimg']
  230. video_dict["cover_url"] = video_obj['indeximg']
  231. # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
  232. self.download_count += 1
  233. self.mq.send_msg(video_dict)
  234. if __name__ == "__main__":
  235. ZL = HHXXZFDScheduling(
  236. log_type="recommend",
  237. crawler="hhxxzfd",
  238. rule_dict={},
  239. our_uid="luojunhuihaoshuai",
  240. env="dev"
  241. )
  242. for i in range(4):
  243. ZL.get_videoList(page_id=i + 1, page_limit=10)
  244. print(ZL.download_count)