# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/10/18 import json import os import random import sys import time import requests from hashlib import md5 from datetime import datetime from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding import binascii from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql, download_rule, download_rule_v2 # 定义一个 AES 加密解密的类 class AESCryptor: def __init__(self): # 初始化密钥和 IV, 在生产环境中,这些值不应该被硬编码 self.key = b"50102fa64073ad76" self.iv = b"173d023138824bb0" # AES 加密方法 def aes_encrypt(self, data): # 使用 PKCS7 填充模式处理待加密的数据,使其长度满足 AES 加密的需求 padder = padding.PKCS7(128).padder() padded_data = padder.update(data.encode('utf-8')) + padder.finalize() # 初始化 AES 加密器,使用 CBC 模式和给定的密钥、IV backend = default_backend() cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=backend) encryptor = cipher.encryptor() ct = encryptor.update(padded_data) + encryptor.finalize() # 将加密后的字节串转为十六进制字符串,并转为大写 return binascii.hexlify(ct).upper().decode('utf-8') # AES 解密方法 def aes_decrypt(self, hex_data): # 将十六进制字符串转为原始的字节串 ct = binascii.unhexlify(hex_data) # 初始化 AES 解密器,使用相同的密钥和 IV backend = default_backend() cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=backend) decryptor = cipher.decryptor() padded_data = decryptor.update(ct) + decryptor.finalize() # 使用 PKCS7 移除填充 unpadder = padding.PKCS7(128).unpadder() data = unpadder.update(padded_data) + unpadder.finalize() # 返回解密后,去掉填充的原始字符串 return data.decode('utf-8') def clean_title(strings): return ( strings.strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") ) class HHXXZFDScheduling: def __init__(self, log_type, crawler, rule_dict, env, our_uid): self.platform = "欢欢喜喜祝福到" self.log_type = log_type self.crawler = crawler self.rule_dict = rule_dict self.env = env self.our_uid = our_uid self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 def repeat_video(self, video_id): sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values( self.log_type, self.crawler, sql, self.env ) return len(repeat_video) # 获取视频id_list def get_videoList(self, page_id, page_limit): time.sleep(random.randint(5, 10)) my_dict = { "pageNo": page_id, # 页数 "pageSize": page_limit, # 每一页的视频数量 "groupId": "1650323161797439489", # 分类 "vn": 1, "gx": 1, "appid": "wx9a60184c443f39af", # 小程序id "type": 2, "hxid": "this may not be important", } my_str = AESCryptor().aes_encrypt(json.dumps(my_dict, ensure_ascii=False)) url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2?v={}".format(my_str) # 请求头 headers = { "xweb_xhr": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817", "content-type": "application/json", "accept": "*/*", "sec-fetch-site": "cross-site", "sec-fetch-mode": "cors", "sec-fetch-dest": "empty", "referer": "https://servicewechat.com/wx9a60184c443f39af/9/page-frame.html", "accept-encoding": "gzip, deflate, br", "accept-language": "en", } response = requests.get(url, headers=headers) result = json.loads(AESCryptor().aes_decrypt(response.text)) if "list" not in result or response.status_code != 200: Common.logger(self.log_type, self.crawler).info( f"get_videoList:{response.text}\n" ) Common.logging( self.log_type, self.crawler, self.env, f"get_videoList:{response.text}\n", ) return elif len(result["list"]["records"]) == 0: Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n") Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n") return else: data_list = result["list"]["records"] for video_obj in data_list: try: self.process_video_obj(video_obj) except Exception as e: Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") Common.logging( self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n" ) def process_video_obj(self, video_obj): # print(type(video_obj)) video_id = video_obj.get("id", 0) video_title = clean_title(video_obj.get("vname", "no title")) video_time = video_obj.get("v_time", 0) publish_time_stamp = int(time.time()) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) user_name = video_obj.get("authname", "") video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": int(video_obj.get("playnum", 0).replace("万+", "0000") if "万+" in video_obj.get("playnum", 0) else video_obj.get("playnum", 0)), "like_cnt": int(video_obj.get("likenum", 0)), "comment_cnt": 0, "share_cnt": 0, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, "session": f"huanhaunxixizhufudao-{int(time.time())}", } for k, v in video_dict.items(): Common.logger(self.log_type, self.crawler).info(f"{k}:{v}") Common.logging( self.log_type, self.crawler, self.env, f"{video_dict}" ) # 过滤无效视频 if video_title == "" or video_dict["video_id"] == "": Common.logger(self.log_type, self.crawler).info("无效视频\n") Common.logging(self.log_type, self.crawler, self.env, "无效视频\n") # 抓取基础规则过滤 elif ( download_rule_v2( log_type=self.log_type, crawler=self.crawler, video_dict=video_dict, rule_dict=self.rule_dict, ) is False ): Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n") Common.logging( self.log_type, self.crawler, self.env, "不满足抓取规则\n" ) elif ( any( str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql( log_type=self.log_type, source=self.crawler, env=self.env, text="filter", action="", ) ) is True ): Common.logger(self.log_type, self.crawler).info("已中过滤词\n") Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n") elif self.repeat_video(video_dict["video_id"]) != 0: Common.logger(self.log_type, self.crawler).info("视频已下载\n") Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n") else: # out_video_id = md5(video_title.encode('utf8')).hexdigest() # out_user_id = md5(user_name.encode('utf8')).hexdigest() video_dict["out_user_id"] = video_obj.get("authid", 0) video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = str(video_dict["video_id"]) video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = video_obj['videoaddr'] video_dict["avatar_url"] = video_obj['authimg'] video_dict["cover_url"] = video_obj['indeximg'] # print(json.dumps(video_dict, ensure_ascii=False, indent=4)) self.download_count += 1 self.mq.send_msg(video_dict) if __name__ == "__main__": ZL = HHXXZFDScheduling( log_type="recommend", crawler="hhxxzfd", rule_dict={}, our_uid="luojunhuihaoshuai", env="dev" ) for i in range(4): ZL.get_videoList(page_id=i + 1, page_limit=10) print(ZL.download_count)