# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/10/18 import json import os import random import sys import time import uuid from datetime import datetime import requests from base64 import b64encode, b64decode from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.public import clean_title from common import AliyunLogger, PiaoQuanPipeline class AESCipher: def __init__(self, key): self.key = key.encode("utf-8") # 需要一个bytes类型的key self.iv = self.key # 在这个例子中,key和iv是相同的 def encrypt(self, data): cipher = AES.new(self.key, AES.MODE_CBC, self.iv) ct_bytes = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size)) ct = b64encode(ct_bytes).decode("utf-8") return ct def decrypt(self, data): try: ct = b64decode(data.encode("utf-8")) cipher = AES.new(self.key, AES.MODE_CBC, self.iv) pt = unpad(cipher.decrypt(ct), AES.block_size) return pt.decode("utf-8") except Exception as e: print("Incorrect decryption") return None class HTZFScheduling: def __init__(self, log_type, crawler, rule_dict, env, our_uid): self.platform = "haitunzhufu" self.log_type = log_type self.crawler = crawler self.rule_dict = rule_dict self.env = env self.our_uid = our_uid self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 # def repeat_video(self, video_id): # sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """ # repeat_video = MysqlHelper.get_values( # self.log_type, self.crawler, sql, self.env # ) # return len(repeat_video) # 获取视频id_list def get_videoList(self, page_id): time.sleep(random.randint(5, 10)) url = "https://haitun.wyapi.cn/videos/api.videos/getItem" headers = { "xweb_xhr": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817", "content-type": "application/json", "accept": "*/*", "sec-fetch-site": "cross-site", "sec-fetch-mode": "cors", "sec-fetch-dest": "empty", "referer": "https://servicewechat.com/wxcc35cbbc445d331a/2/page-frame.html", "accept-encoding": "gzip, deflate, br", "accept-language": "en", } params = {"mark": "", "page": page_id} response = requests.get(url, headers=headers, params=params) ori_result = response.json() AliyunLogger.logging( code="1000", platform=self.crawler, mode=self.log_type, env=self.env, message="开始抓取第{}页".format(page_id), ) key = "xlc2ze7qnqg8xi1d" cipher = AESCipher(key) try: decrypted_text = cipher.decrypt(ori_result["data"]) AliyunLogger.logging( code="1000", platform=self.crawler, mode=self.log_type, env=self.env, message="第{}页, 解密成功".format(page_id), ) except: AliyunLogger.logging( code="2000", platform=self.crawler, mode=self.log_type, env=self.env, message="第{}页, 解密失败".format(page_id), ) return result = json.loads(decrypted_text) if "list" not in result or response.status_code != 200: Common.logger(self.log_type, self.crawler).info( f"get_videoList:{response.text}\n" ) # Common.logging( # self.log_type, # self.crawler, # self.env, # f"get_videoList:{response.text}\n", # ) AliyunLogger.logging( code="2000", platform=self.crawler, mode=self.log_type, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif len(result["list"]) == 0: Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n") # Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n") AliyunLogger.logging( code="2000", platform=self.crawler, mode=self.log_type, env=self.env, message=f"没有更多数据啦~\n", ) return else: data_list = result["list"] for index, video_obj in enumerate(data_list): try: AliyunLogger.logging( code="1001", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="成功扫描到一条视频, 该视频位于第{}页{}条".format(page_id, index + 1), ) self.process_video_obj(video_obj) except Exception as e: Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") # Common.logging( # self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n" # ) AliyunLogger.logging( code="3000", platform=self.crawler, mode=self.log_type, env=self.env, data=video_obj, message="抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format( e, page_id, index + 1 ), ) def process_video_obj(self, video_obj): video_id = video_obj.get("id", 0) trace_id = self.crawler + str(uuid.uuid1()) video_title = clean_title(video_obj.get("name", "no title")) video_time = 0 publish_time_str = video_obj.get("create_at", "") # 将时间字符串转换为 datetime 对象 dt = datetime.strptime(publish_time_str, "%Y-%m-%d %H:%M:%S") # 将 datetime 对象转换为时间戳 publish_time_stamp = int(datetime.timestamp(dt)) user_name = "" video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": int(video_obj.get("num_read", 0)), "like_cnt": int(video_obj.get("num_like", 0)), "comment_cnt": int(video_obj.get("num_comment", 0)), "share_cnt": 0, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "update_time_stamp": int(time.time()), "publish_time_str": publish_time_str, "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, "session": f"haitunzhufu-{int(time.time())}", "out_user_id": video_obj.get("profile_id", 0), "platform": self.crawler, "strategy": self.log_type, } video_dict["out_video_id"] = str(video_dict["video_id"]) pipeline = PiaoQuanPipeline( platform=self.crawler, mode=self.log_type, rule_dict=self.rule_dict, env=self.env, item=video_dict, trace_id=trace_id ) flag = pipeline.process_item() if flag: video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = video_obj["cover"] video_dict["avatar_url"] = "" video_dict["cover_url"] = video_obj["cover"] + "&vframe/png/offset/1/w/200" self.download_count += 1 self.mq.send_msg(video_dict) # print(video_dict) AliyunLogger.logging( code="1002", platform=self.crawler, mode=self.log_type, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL" ) if __name__ == "__main__": ZL = HTZFScheduling( log_type="recommend", crawler="haitunzhufu", rule_dict={}, our_uid="luojunhuihaoshuai", env="dev", ) for i in range(4): ZL.get_videoList(page_id=i + 1) print(ZL.download_count)