# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/10/18 import json import os import random import sys import time from datetime import datetime import requests from base64 import b64encode, b64decode from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql, download_rule_v2, clean_title class AESCipher: def __init__(self, key): self.key = key.encode('utf-8') # 需要一个bytes类型的key self.iv = self.key # 在这个例子中,key和iv是相同的 def encrypt(self, data): cipher = AES.new(self.key, AES.MODE_CBC, self.iv) ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size)) ct = b64encode(ct_bytes).decode('utf-8') return ct def decrypt(self, data): try: ct = b64decode(data.encode('utf-8')) cipher = AES.new(self.key, AES.MODE_CBC, self.iv) pt = unpad(cipher.decrypt(ct), AES.block_size) return pt.decode('utf-8') except Exception as e: print("Incorrect decryption") return None class HTZFScheduling: def __init__(self, log_type, crawler, rule_dict, env, our_uid): self.platform = "haitunzhufu" self.log_type = log_type self.crawler = crawler self.rule_dict = rule_dict self.env = env self.our_uid = our_uid self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 def repeat_video(self, video_id): sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values( self.log_type, self.crawler, sql, self.env ) return len(repeat_video) # 获取视频id_list def get_videoList(self, page_id): time.sleep(random.randint(5, 10)) url = 'https://haitun.wyapi.cn/videos/api.videos/getItem' headers = { 'xweb_xhr': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817', 'content-type': 'application/json', 'accept': '*/*', 'sec-fetch-site': 'cross-site', 'sec-fetch-mode': 'cors', 'sec-fetch-dest': 'empty', 'referer': 'https://servicewechat.com/wxcc35cbbc445d331a/2/page-frame.html', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'en' } params = { 'mark': '', 'page': page_id } response = requests.get(url, headers=headers, params=params) ori_result = response.json() key = "xlc2ze7qnqg8xi1d" cipher = AESCipher(key) decrypted_text = cipher.decrypt(ori_result['data']) result = json.loads(decrypted_text) if "list" not in result or response.status_code != 200: Common.logger(self.log_type, self.crawler).info( f"get_videoList:{response.text}\n" ) Common.logging( self.log_type, self.crawler, self.env, f"get_videoList:{response.text}\n", ) return elif len(result["list"]) == 0: Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n") Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n") return else: data_list = result["list"] for video_obj in data_list: try: self.process_video_obj(video_obj) except Exception as e: Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") Common.logging( self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n" ) def process_video_obj(self, video_obj): video_id = video_obj.get("id", 0) video_title = clean_title(video_obj.get("name", "no title")) video_time = 0 publish_time_str = video_obj.get("create_at", "") # 将时间字符串转换为 datetime 对象 dt = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S') # 将 datetime 对象转换为时间戳 publish_time_stamp = int(datetime.timestamp(dt)) user_name = "" video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": int(video_obj.get("num_read", 0)), "like_cnt": int(video_obj.get("num_like", 0)), "comment_cnt": int(video_obj.get("num_comment", 0)), "share_cnt": 0, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, "session": f"haitunzhufu-{int(time.time())}", } for k, v in video_dict.items(): Common.logger(self.log_type, self.crawler).info(f"{k}:{v}") Common.logging( self.log_type, self.crawler, self.env, f"{video_dict}" ) # 过滤无效视频 if video_title == "" or video_dict["video_id"] == "": Common.logger(self.log_type, self.crawler).info("无效视频\n") Common.logging(self.log_type, self.crawler, self.env, "无效视频\n") # 抓取基础规则过滤 elif ( download_rule_v2( log_type=self.log_type, crawler=self.crawler, video_dict=video_dict, rule_dict=self.rule_dict, ) is False ): Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n") Common.logging( self.log_type, self.crawler, self.env, "不满足抓取规则\n" ) elif ( any( str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql( log_type=self.log_type, source=self.crawler, env=self.env, text="filter", action="", ) ) is True ): Common.logger(self.log_type, self.crawler).info("已中过滤词\n") Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n") elif self.repeat_video(video_dict["video_id"]) != 0: Common.logger(self.log_type, self.crawler).info("视频已下载\n") Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n") else: video_dict["out_user_id"] = video_obj.get("profile_id", 0) video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = str(video_dict["video_id"]) video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = video_obj['cover'] video_dict["avatar_url"] = "" video_dict["cover_url"] = video_obj['cover'] + "&vframe/png/offset/1/w/200" # print(json.dumps(video_dict, ensure_ascii=False, indent=4)) self.download_count += 1 self.mq.send_msg(video_dict) if __name__ == "__main__": ZL = HTZFScheduling( log_type="recommend", crawler="htzf", rule_dict={}, our_uid="luojunhuihaoshuai", env="dev" ) for i in range(4): ZL.get_videoList(page_id=i + 1) print(ZL.download_count)