| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 | """福小顺推荐爬虫代码2024-01-22"""import osimport sysimport jsonimport timeimport uuidimport randomimport asyncioimport aiohttpimport datetimefrom base64 import b64decodefrom datetime import datetimeimport requestsfrom Crypto.Cipher import AESfrom Crypto.Util.Padding import unpadsys.path.append(os.getcwd())from application.common import Feishufrom application.items import VideoItemfrom application.pipeline import PiaoQuanPipelinefrom application.common.messageQueue import MQfrom application.common.log import AliyunLoggerdef fxs_decrypt(ciphertext):    """    福小顺逆向解密代码    :param ciphertext: 秘文    :return: 原文    """    password = "xlc2ze7qnqg8xi1d".encode()    iv = password    try:        ct = b64decode(ciphertext.encode("utf-8"))        cipher = AES.new(password, AES.MODE_CBC, iv)        pt = unpad(cipher.decrypt(ct), AES.block_size)        return pt.decode()    except Exception as e:        print("Incorrect decryption {}".format(e))        return Noneclass FuXiaoShunRecommend(object):    """    福小顺推荐爬虫    需要逆序, 逆向结果: AES加密,password=iv='xlc2ze7qnqg8xi1d'.encode()    """    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):        self.platform = platform        self.mode = mode        self.rule_dict = rule_dict        self.user_list = user_list        self.env = env        self.download_cnt = 0        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)        self.expire_flag = False        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)    def process_video_obj(self, video_obj):        """        处理每一个视频内容        :return: None        """        trace_id = self.platform + str(uuid.uuid1())        our_user = random.choice(self.user_list)        publish_time_stamp = datetime.strptime(            video_obj["create_at"], "%Y-%m-%d %H:%M:%S"        ).timestamp()        item = VideoItem()        item.add_video_info("user_id", our_user["uid"])        item.add_video_info("user_name", our_user["nick_name"])        item.add_video_info("video_id", video_obj["id"])        item.add_video_info("video_title", video_obj["name"])        item.add_video_info("publish_time_str", video_obj["create_at"])        item.add_video_info("publish_time_stamp", int(publish_time_stamp))        item.add_video_info("video_url", video_obj["cover"])        item.add_video_info(            "cover_url", video_obj["cover"] + "&vframe/png/offset/1/w/200"        )        item.add_video_info("like_cnt", video_obj["num_like"])        item.add_video_info("play_cnt", video_obj["num_read"])        item.add_video_info("comment_cnt", video_obj["num_comment"])        item.add_video_info("out_video_id", video_obj["id"])        item.add_video_info("platform", self.platform)        item.add_video_info("strategy", self.mode)        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))        mq_obj = item.produce_item()        pipeline = PiaoQuanPipeline(            platform=self.platform,            mode=self.mode,            rule_dict=self.rule_dict,            env=self.env,            item=mq_obj,            trace_id=trace_id,        )        if pipeline.process_item():            self.download_cnt += 1            # 获取当前时间            current_time = datetime.now()            formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")            values = [[                video_obj["id"],                formatted_time,                video_obj["name"],                video_obj["cover"] + "&vframe/png/offset/1/w/200",                video_obj["cover"],                video_obj["num_like"],                video_obj["num_read"]            ]]            Feishu.insert_columns(self.platform, 'fuxiaoshun', "0e1e47", "ROWS", 1, 2)            time.sleep(0.5)            Feishu.update_values(self.platform, 'fuxiaoshun', "0e1e47", "A2:Z2", values)            self.mq.send_msg(mq_obj)            self.aliyun_log.logging(                code="1002",                message="成功发送至 ETL",                data=mq_obj,            )            if self.download_cnt >= int(                self.rule_dict.get("videos_cnt", {}).get("min", 200)            ):                self.expire_flag = True    def get_recommend_list(self, page_index):        """        获取推荐页面的video_list        :param page_index: 页码        :return: None        """        if self.expire_flag:            self.aliyun_log.logging(                code="2000",                message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),            )            return        host_referer_mapping = {            'quan.nnjuxing.cn': 'https://servicewechat.com/wxbb18ecb64efe217a/2/page-frame.html',  # 福小全            'nian.nnjuxing.cn': 'https://servicewechat.com/wx3e31d735ebb23d29/3/page-frame.html',  # 福小年            'shun.nnjuxing.cn': 'https://servicewechat.com/wx5b89401c90c9f367/3/page-frame.html'   # 福小顺        }        # 随机选择一个 host        random_host = random.choice(list(host_referer_mapping.keys()))        # 根据选择的 host 获取对应的 referer        random_referer = host_referer_mapping[random_host]        headers = {            "Host": random_host,            "xweb_xhr": "1",            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156",            "content-type": "application/json",            "accept": "*/*",            "sec-fetch-site": "cross-site",            "sec-fetch-mode": "cors",            "sec-fetch-dest": "empty",            "referer": random_referer,            "accept-language": "en-US,en;q=0.9",        }        url = f"https://{random_host}/videos/api.videos/getItem?page={page_index}"        payload = {}        time.sleep(10)        response = requests.request("GET", url, headers=headers, data=payload)        cryp_data = response.json()        data = json.loads(fxs_decrypt(cryp_data["data"]))        for index, video_obj in enumerate(data["list"], 1):            try:                self.aliyun_log.logging(                    code="1001",                    message="扫描到一条视频",                    data=video_obj,                )                self.process_video_obj(video_obj)            except Exception as e:                self.aliyun_log.logging(                    code="3000",                    message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),                )    def run(self):        """        执行代码        :return: None        """        for page in range(1, 200):            if self.expire_flag:                self.aliyun_log.logging(                    code="2000",                    message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),                )                message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)                print(message)                return            else:                try:                    self.get_recommend_list(page_index=page)                except Exception as e:                    self.aliyun_log.logging(                        code="3000",                        message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),                    )if __name__ == '__main__':    J = FuXiaoShunRecommend(        platform="fuxiaoshun",        mode="recommend",        rule_dict={},        user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],    )    J.run()
 |