""" @author: luojunhui """ import os import sys import json import time import uuid import random import datetime import requests sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.proxies import tunnel_proxies from application.common.log import AliyunLogger class ZuiHaoDeSongNi(object): """ 最好的送你——推荐爬虫 """ def __init__(self, platform, mode, rule_dict, user_list, env="prod"): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode) def process_video_obj(self, video_obj): """ 处理每一个视频内容 :return: None """ trace_id = self.platform + str(uuid.uuid1()) our_user = random.choice(self.user_list) publish_time_stamp = int(video_obj["update_time"]) publish_time_str = datetime.datetime.fromtimestamp(publish_time_stamp).strftime( "%Y-%m-%d %H:%M:%S" ) item = VideoItem() item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) item.add_video_info("video_id", video_obj["nid"]) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("publish_time_str", publish_time_str) item.add_video_info("publish_time_stamp", int(publish_time_stamp)) item.add_video_info("video_url", video_obj["video_url"]) item.add_video_info("cover_url", video_obj["video_cover"]) item.add_video_info("out_video_id", video_obj["nid"]) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 self.mq.send_msg(mq_obj) # 随机等待 5 分钟 time.sleep(60 * random.randint(1, 5)) self.aliyun_log.logging( code="1002", message="成功发送至 ETL", data=mq_obj, ) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.expire_flag = True def get_recommend_list(self, page_index): """ 获取推荐页面的video_list :param page_index: 页码 :return: None """ if self.expire_flag: self.aliyun_log.logging( code="2000", message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt), ) return headers = { 'Host': 'zhdsn.wentingyou.cn', "content-time": str(int(time.time() * 1000)), "cache-time": str(int(time.time() * 1000)), 'chatkey': 'wx00da988283a73cdf', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156', 'content-type': 'application/x-www-form-urlencoded', 'visitorkey': '17096941221026589978', 'xweb_xhr': '1', 'vision': '1.1.0', 'token': '', 'accept': '*/*', 'sec-fetch-site': 'cross-site', 'sec-fetch-mode': 'cors', 'sec-fetch-dest': 'empty', 'referer': 'https://servicewechat.com/wx00da988283a73cdf/7/page-frame.html', 'accept-language': 'en-US,en;q=0.9' } po = { "cid": "", "page": page_index, "is_ads": 1, "model": random.choice( [ "Windows", "Mac", "HuaWei", "Xiaomi", "Xiaomi2", "Yandex", "Google", "iphone", "oppo", ] ), "mini_version": "3.8.6", "ini_id": "17096941221026589978" } params = {"parameter": json.dumps(po)} url = "https://zhdsn.wentingyou.cn/index.php/v111/index/index" time.sleep(5) response = requests.request( "GET", url=url, headers=headers, params=params ) data = response.json() for index, video_obj in enumerate(data["data"]["list"], 1): try: self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=video_obj, ) self.process_video_obj(video_obj) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e), ) def run(self): """ 执行代码 :return: None """ for page in range(1, 40): if self.expire_flag: self.aliyun_log.logging( code="2000", message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt), ) return else: # self.get_recommend_list(page_index=page) try: self.get_recommend_list(page_index=page) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e), )