| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 | """鲨鱼祝福——原鲸鱼祝福"""import osimport jsonimport randomimport sysimport timeimport uuidimport requestsimport datetimefrom base64 import b64encode, b64decodefrom Crypto.Cipher import AESfrom Crypto.Util.Padding import pad, unpadsys.path.append(os.getcwd())from application.items import VideoItemfrom application.pipeline import PiaoQuanPipelinefrom application.common.messageQueue import MQfrom application.common.proxies import tunnel_proxiesfrom application.common.log import AliyunLoggerfrom application.common.mysql import MysqlHelperclass SharkAES(object):    """    鲨鱼祝福 aes 解密    """    def __init__(self):        self.key = 'xlc2ze7qnqg8xi1d'.encode()  # 需要一个bytes类型的key        self.iv = self.key  # 在这个例子中,key和iv是相同的    def encrypt(self, data):        cipher = AES.new(self.key, AES.MODE_CBC, self.iv)        ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))        ct = b64encode(ct_bytes).decode()        return ct    def decrypt(self, data):        try:            ct = b64decode(data.encode('utf-8'))            cipher = AES.new(self.key, AES.MODE_CBC, self.iv)            pt = unpad(cipher.decrypt(ct), AES.block_size)            return pt.decode()        except Exception as e:            print("Incorrect decryption")            return Noneclass SharkZhuFuRecommend(object):    """    鲨鱼祝福推荐爬虫    """    def __init__(self, platform, mode, rule_dict, user_list, env):        self.platform = platform        self.mode = mode        self.rule_dict = rule_dict        self.user_list = user_list        self.env = env        self.download_cnt = 0        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)        self.limit_flag = False        self.cryptor = SharkAES()        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)        self.mysql = MysqlHelper(platform=self.platform, mode=self.mode)    def get_video_list(self):        """        :return: 获取视频列表        """        base_url = "https://shanhu.nnapi.cn/videos/api.videos/getItem"        headers = {            "Host": "shanhu.nnapi.cn",            "xweb_xhr": "1",            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100",            "content-type": "application/json",            "accept": "*/*",            "sec-fetch-site": "cross-site",            "sec-fetch-mode": "cors",            "sec-fetch-dest": "empty",            "referer": "https://servicewechat.com/wx7444f6906dbd46b1/2/page-frame.html",            "accept-language": "en-US,en;q=0.9",            "Cookie": "PHPSESSID=562dc39e8e68ad3e76c237f687bd049b; lang=zh-cn",        }        for i in range(100):            time.sleep(random.randint(1, 10))            try:                if self.limit_flag:                    self.aliyun_log.logging(                        code="2000",                        message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),                    )                    return                else:                    params = {"mark": "", "page": i + 1}                    response = requests.get(                        url=base_url,                        headers=headers,                        params=params,                    )                    encrypted_info = response.json()["data"]                    decrypted_info = json.loads(                        self.cryptor.decrypt(data=encrypted_info)                    )                    video_list = decrypted_info["list"]                    for index, video_obj in enumerate(video_list, 1):                        try:                            self.aliyun_log.logging(                                code="1001",                                message="扫描到一条视频",                                data=video_obj,                            )                            self.process_video_obj(video_obj)                        except Exception as e:                            self.aliyun_log.logging(                                code="3000",                                data=video_obj,                                message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),                            )            except Exception as e:                self.aliyun_log.logging(                    code="3000",                    message="抓取第{}页时候出现错误, 报错信息是{}".format(i + 1, e),                )    def process_video_obj(self, video_obj):        """        :param video_obj:  视频 obj        :return: None        """        trace_id = self.platform + str(uuid.uuid1())        our_user = random.choice(self.user_list)        publish_time_stamp = datetime.datetime.strptime(            video_obj["create_at"], "%Y-%m-%d %H:%M:%S"        ).timestamp()        item = VideoItem()        item.add_video_info("user_id", our_user["uid"])        item.add_video_info("user_name", our_user["nick_name"])        item.add_video_info("video_id", video_obj["id"])        item.add_video_info("video_title", video_obj["name"])        item.add_video_info("publish_time_str", video_obj["create_at"])        item.add_video_info("publish_time_stamp", int(publish_time_stamp))        item.add_video_info("video_url", video_obj["cover"])        item.add_video_info(            "cover_url", video_obj["cover"] + "&vframe/png/offset/1/w/200"        )        item.add_video_info("like_cnt", video_obj["num_like"])        item.add_video_info("play_cnt", video_obj["num_read"])        item.add_video_info("comment_cnt", video_obj["num_comment"])        item.add_video_info("out_video_id", video_obj["id"])        item.add_video_info("platform", self.platform)        item.add_video_info("strategy", self.mode)        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))        mq_obj = item.produce_item()        pipeline = PiaoQuanPipeline(            platform=self.platform,            mode=self.mode,            rule_dict=self.rule_dict,            env=self.env,            item=mq_obj,            trace_id=trace_id,        )        if pipeline.process_item():            self.download_cnt += 1            # print(mq_obj)            self.mq.send_msg(mq_obj)            self.aliyun_log.logging(                code="1002",                message="成功发送至 ETL",                data=mq_obj,            )            if self.download_cnt >= int(                    self.rule_dict.get("videos_cnt", {}).get("min", 200)            ):                self.limit_flag = True    def run(self):        """        执行函数        """        self.get_video_list()# if __name__ == '__main__':#     S = ShanHuZhuFuRecommend(#         platform="shanhuzhufu",#         mode="recommend",#         env="dev",#         rule_dict={},#         user_list=[{'nick_name': "Ivring", 'uid': "1997"}, {'nick_name': "paul", 'uid': "1998"}]#     )#     S.get_video_list()
 |