# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/12/14 import json import os import random import sys import time import uuid import requests from Crypto.Cipher import AES from Crypto.Hash import MD5 from Crypto.Util.Padding import pad, unpad from base64 import b64encode, b64decode from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.aliyun_log import AliyunLogger from common.pipeline import PiaoQuanPipeline from common.public import clean_title def decrypt(a, e, n): e = MD5.new(e.encode()).hexdigest() key = e[16:].encode() iv = e[:16].encode() cipher = AES.new(key, AES.MODE_CBC, iv) if n: encrypted_data = b64decode(a) decrypted_data = unpad(cipher.decrypt(encrypted_data), AES.block_size) return decrypted_data.decode() else: padded_data = pad(a.encode(), AES.block_size) encrypted_data = cipher.encrypt(padded_data) return b64encode(encrypted_data).decode() def find_tencent_url(tx_vid): headers = { "Host": "h5vv.video.qq.com", "xweb_xhr": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817", "Content-Type": "application/x-www-form-urlencoded", "Accept": "*/*", "Sec-Fetch-Site": "cross-site", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "https://servicewechat.com/wx5fcd817f3f80aece/3/page-frame.html", "Accept-Language": "en", } video_id = tx_vid url = "https://h5vv.video.qq.com/getinfo?vid={}&platform=101001&charge=0&otype=json&defn=shd".format(video_id) response = requests.get(url, headers=headers) result = json.loads(response.text.replace("QZOutputJson=", "")[:-1]) vl = result["vl"]["vi"][0] key = vl["fvkey"] name = vl["fn"] folder = vl["ul"]["ui"][0]["url"] url = folder + name + "?vkey=" + key return url class ZFSLYScheduling: def __init__(self, log_type, crawler, rule_dict, env, our_uid): self.platform = "zhufusonglaoyou" self.log_type = log_type self.crawler = crawler self.rule_dict = rule_dict self.env = env self.our_uid = our_uid self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 # 获取视频id_list def get_videoList(self, page_id): time.sleep(random.randint(5, 10)) url = "https://zhufusonglaoyou2.mengniu99.com/api/getcatevideos" params = { "cateid": "video", "page": page_id, "timeline": 0, "version": "9.0.2", } headers = { 'Host': 'zhufusonglaoyou2.mengniu99.com', 'xweb_xhr': '1', 'Authorization': 'o7hOQ5XsP-OtIuOK8qAXe368o45E', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100', 'Sign': '2c694618acd1218cb0876a825165ca45', 'Content-Type': 'application/json', 'Accept': '*/*', 'Sec-Fetch-Site': 'cross-site', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Dest': 'empty', 'Referer': 'https://servicewechat.com/wx5b38d01fa06bba64/4/page-frame.html', 'Accept-Language': 'en-US,en;q=0.9' } while True: try: response = requests.get(url, headers=headers, params=params) decrypted_data = decrypt( response.json()["data"][:-2], response.json()["_yyy"], True ) result = json.loads(decrypted_data) AliyunLogger.logging( code="1000", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="开始抓取第{}页".format(page_id), ) break except: AliyunLogger.logging( code="2000", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="抓取第{}页,未获取数据,编码错误".format(page_id), ) Common.logger(self.log_type, self.crawler).info("编码不对,解密失败\n") return if "totalCount" not in result: Common.logger(self.log_type, self.crawler).info( f"get_videoList:{response.text}\n" ) AliyunLogger.logging( code="2000", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="抓取第{}页,未获取数据".format(page_id), ) return elif len(result["videos"]) == 0: Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n") AliyunLogger.logging( code="2000", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="抓取第{}页,没有更多数据啦".format(page_id), ) return else: data_list = result["videos"] for index, video_obj in enumerate(data_list): try: AliyunLogger.logging( code="1001", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="成功扫描到一条视频, 该视频位于第{}页{}条".format(page_id, index + 1), ) self.process_video_obj(video_obj) except Exception as e: Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") AliyunLogger.logging( code="3000", platform=self.crawler, mode=self.log_type, env=self.env, data=video_obj, message="抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format( e, page_id, index + 1 ), ) AliyunLogger.logging( code="1000", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="完成抓取第{}页".format(page_id), ) def process_video_obj(self, video_obj): trace_id = self.platform + str(uuid.uuid1()) video_id = video_obj.get("videoid", 0) video_title = clean_title(video_obj.get("title", "no title")) video_time = video_obj.get("v_time", 0) publish_time_stamp = int(time.time()) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) user_name = video_obj["nickname"] video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": 0, "like_cnt": 0, "comment_cnt": 0, "share_cnt": 0, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "update_time_stamp": int(time.time()), "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, "cover_url": video_obj["cover"], "session": f"ganggangdouchuan-{int(time.time())}", } video_dict["out_video_id"] = str(video_dict["video_id"]) rule_pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.log_type, rule_dict=self.rule_dict, env=self.env, item=video_dict, trace_id=trace_id ) flag = rule_pipeline.process_item() if flag: video_dict["out_user_id"] = video_dict["profile_id"] video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = find_tencent_url(video_obj["txvid"]) video_dict["avatar_url"] = video_obj["avatarurl"] video_dict["cover_url"] = video_obj["cover"] self.download_count += 1 self.mq.send_msg(video_dict) AliyunLogger.logging( code="1002", platform=self.crawler, mode=self.log_type, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) if __name__ == "__main__": ZL =ZFSLYScheduling( log_type="recommend", crawler="zhufusonglaoyou", rule_dict={}, our_uid="luojunhuihaoshuai", env="dev", ) for i in range(1): ZL.get_videoList(page_id=i + 1) print(ZL.download_count)