# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/10/23 import json import os import sys import time import uuid import random import requests sys.path.append(os.getcwd()) from common.mq import MQ from common.aliyun_log import AliyunLogger from common.pipeline import PiaoQuanPipeline from common.public import clean_title class YLGXXSPScheduling: def __init__(self, platform, mode, rule_dict, env, our_uid_list): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.our_uid_list = our_uid_list self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 # 获取视频id_list def get_videoList(self, page_id): time.sleep(random.randint(5, 10)) AliyunLogger.logging( code="1000", platform=self.platform, mode=self.mode, env=self.env, data={}, message="开始抓取第{}页".format(page_id), ) headers = { "Host": "cpu.baidu.com", "xweb_xhr": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009", "Accept": "*/*", "Sec-Fetch-Site": "cross-site", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "https://servicewechat.com/wx38382a240eab7214/4/page-frame.html", "Accept-Language": "en-US,en;q=0.9", } channel_id_dict = { '1059': "搞笑", "1058": "音乐", "1061": "娱乐", "1063": "社会", "1066": "生活", "1064": "猎奇" } for channel_id in channel_id_dict: data = { "channelId": channel_id, "needHybrid": "1", "pageNo": str(page_id), "pageSize": "10", } response = requests.post( "https://cpu.baidu.com/1033/a16a67fe", headers=headers, data=data ) result = response.json() channel_name = channel_id_dict[channel_id] if "data" not in result or response.status_code != 200: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, data={}, message="{}抓取第{}页失败,无数据".format(channel_name, page_id), ) return elif len(result["data"]["result"]) == 0: AliyunLogger.logging( code="2001", platform=self.platform, mode=self.mode, env=self.env, data={}, message="{}抓取d到第{}页, 没有更多数据了".format(channel_name, page_id), ) return else: data_list = result["data"]["result"] for index, video_obj in enumerate(data_list): # AliyunLogger.logging( # code="1001", # platform=self.platform, # mode=self.mode, # env=self.env, # data={}, # message="{}成功扫描到一条视频, 该视频位于第{}页{}条".format(channel_name, page_id, index + 1), # ) # self.process_video_obj(video_obj) try: AliyunLogger.logging( code="1001", platform=self.platform, mode=self.mode, env=self.env, data={}, message="{}成功扫描到一条视频, 该视频位于第{}页{}条".format(channel_name, page_id, index + 1), ) self.process_video_obj(video_obj) except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="{}抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format( channel_name, e, page_id, index + 1 ), ) AliyunLogger.logging( code="1000", platform=self.platform, mode=self.mode, env=self.env, data={}, message="{}完成抓取第{}页".format(channel_name, page_id), ) def process_video_obj(self, video_obj): trace_id = self.platform + str(uuid.uuid1()) video_id = video_obj.get("data", {}).get("id", 0) video_title = clean_title(video_obj.get("data", {}).get("title", "no title")) video_time = video_obj["data"]["duration"] publish_time_stamp = int(video_obj["data"]["clusterTime"]) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) user_name = video_obj["data"]["source"] video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": int(video_obj["data"].get("playbackCount", 0)), "like_cnt": int(video_obj.get("likeCount", 0)), "comment_cnt": int(video_obj.get("commentCounts", 0)), "share_cnt": 0, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "update_time_stamp": int(time.time()), "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, "out_video_id": video_id, "session": f"youlegaoxiaoxiaoshipin-{int(time.time())}", } rule_pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video_dict, trace_id=trace_id ) flag = rule_pipeline.process_item() if flag: video_dict["out_user_id"] = video_obj["data"].get("ownerId", 0) video_dict["platform"] = self.platform video_dict["strategy"] = self.mode video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = random.choice(self.our_uid_list) video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = video_obj["data"]["url"] video_dict["avatar_url"] = "http:" + video_obj["data"]["avatar"] video_dict["cover_url"] = "http:" + video_obj["data"]["thumbUrl"] self.download_count += 1 self.mq.send_msg(video_dict) # print(video_dict) AliyunLogger.logging( code="1002", platform=self.platform, mode=self.mode, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) if __name__ == "__main__": ZL = YLGXXSPScheduling( platform="ylgxxsp", mode="recommend", rule_dict={}, our_uid="luojunhuihaoshuai", env="prod", ) for i in range(5): ZL.get_videoList(page_id=i + 1) print(ZL.download_count)