""" 鲨鱼祝福——原鲸鱼祝福 """ import os import json import random import sys import time import uuid import requests import datetime from base64 import b64encode, b64decode from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.proxies import tunnel_proxies from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper class SharkAES(object): """ 鲨鱼祝福 aes 解密 """ def __init__(self): self.key = 'xlc2ze7qnqg8xi1d'.encode() # 需要一个bytes类型的key self.iv = self.key # 在这个例子中,key和iv是相同的 def encrypt(self, data): cipher = AES.new(self.key, AES.MODE_CBC, self.iv) ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size)) ct = b64encode(ct_bytes).decode() return ct def decrypt(self, data): try: ct = b64decode(data.encode('utf-8')) cipher = AES.new(self.key, AES.MODE_CBC, self.iv) pt = unpad(cipher.decrypt(ct), AES.block_size) return pt.decode() except Exception as e: print("Incorrect decryption") return None class SharkZhuFuRecommend(object): """ 鲨鱼祝福推荐爬虫 """ def __init__(self, platform, mode, rule_dict, user_list, env): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.limit_flag = False self.cryptor = SharkAES() self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode) self.mysql = MysqlHelper(platform=self.platform, mode=self.mode) def get_video_list(self): """ :return: 获取视频列表 """ base_url = "https://shanhu.nnapi.cn/videos/api.videos/getItem" headers = { "Host": "shanhu.nnapi.cn", "xweb_xhr": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100", "content-type": "application/json", "accept": "*/*", "sec-fetch-site": "cross-site", "sec-fetch-mode": "cors", "sec-fetch-dest": "empty", "referer": "https://servicewechat.com/wx7444f6906dbd46b1/2/page-frame.html", "accept-language": "en-US,en;q=0.9", "Cookie": "PHPSESSID=562dc39e8e68ad3e76c237f687bd049b; lang=zh-cn", } for i in range(100): time.sleep(random.randint(1, 10)) try: if self.limit_flag: self.aliyun_log.logging( code="2000", message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt), ) return else: params = {"mark": "", "page": i + 1} response = requests.get( url=base_url, headers=headers, params=params, ) encrypted_info = response.json()["data"] decrypted_info = json.loads( self.cryptor.decrypt(data=encrypted_info) ) video_list = decrypted_info["list"] for index, video_obj in enumerate(video_list, 1): try: self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=video_obj, ) self.process_video_obj(video_obj) except Exception as e: self.aliyun_log.logging( code="3000", data=video_obj, message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e), ) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取第{}页时候出现错误, 报错信息是{}".format(i + 1, e), ) def process_video_obj(self, video_obj): """ :param video_obj: 视频 obj :return: None """ trace_id = self.platform + str(uuid.uuid1()) our_user = random.choice(self.user_list) publish_time_stamp = datetime.datetime.strptime( video_obj["create_at"], "%Y-%m-%d %H:%M:%S" ).timestamp() item = VideoItem() item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) item.add_video_info("video_id", video_obj["id"]) item.add_video_info("video_title", video_obj["name"]) item.add_video_info("publish_time_str", video_obj["create_at"]) item.add_video_info("publish_time_stamp", int(publish_time_stamp)) item.add_video_info("video_url", video_obj["cover"]) item.add_video_info( "cover_url", video_obj["cover"] + "&vframe/png/offset/1/w/200" ) item.add_video_info("like_cnt", video_obj["num_like"]) item.add_video_info("play_cnt", video_obj["num_read"]) item.add_video_info("comment_cnt", video_obj["num_comment"]) item.add_video_info("out_video_id", video_obj["id"]) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 # print(mq_obj) self.mq.send_msg(mq_obj) self.aliyun_log.logging( code="1002", message="成功发送至 ETL", data=mq_obj, ) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.limit_flag = True def run(self): """ 执行函数 """ self.get_video_list() # if __name__ == '__main__': # S = ShanHuZhuFuRecommend( # platform="shanhuzhufu", # mode="recommend", # env="dev", # rule_dict={}, # user_list=[{'nick_name': "Ivring", 'uid': "1997"}, {'nick_name': "paul", 'uid': "1998"}] # ) # S.get_video_list()