# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/10/23 import json import os import random import sys import time import requests sys.path.append(os.getcwd()) from common.mq import MQ from common.aliyun_log import AliyunLogger from common.pipeline import PiaoQuanPipeline def clean_title(strings): return ( strings.strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") ) class YLGXXSPScheduling: def __init__(self, log_type, crawler, rule_dict, env, our_uid): self.platform = "youlegaoxiaoxiaoshipin" self.log_type = log_type self.crawler = crawler self.rule_dict = rule_dict self.env = env self.our_uid = our_uid self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 # 获取视频id_list def get_videoList(self, page_id): # time.sleep(random.randint(5, 10)) headers = { "Host": "cpu.baidu.com", "xweb_xhr": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009", "Accept": "*/*", "Sec-Fetch-Site": "cross-site", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "https://servicewechat.com/wx38382a240eab7214/4/page-frame.html", "Accept-Language": "en-US,en;q=0.9", } data = { "channelId": "1033", "needHybrid": "1", "pageNo": str(page_id), "pageSize": "10", } response = requests.post( "https://cpu.baidu.com/1033/a16a67fe", headers=headers, data=data ) result = response.json() if "data" not in result or response.status_code != 200: # Common.logger(self.log_type, self.crawler).info( # f"get_videoList:{response.text}\n" # ) # Common.logging( # self.log_type, # self.crawler, # self.env, # f"get_videoList:{response.text}\n", # ) return elif len(result["data"]['result']) == 0: # Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n") # Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n") return else: data_list = result['data']["result"] for video_obj in data_list: print(1) AliyunLogger.logging( code="1001", platform=self.crawler, mode=self.log_type, env=self.env, data={}, message="成功扫描到一条视频" ) self.process_video_obj(video_obj) # try: # AliyunLogger.logging( # code="1001", # platform=self.crawler, # mode=self.log_type, # env=self.env, # data="", # message="成功扫描到一条视频" # ) # self.process_video_obj(video_obj) # except Exception as e: # Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") # Common.logging( # self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n" # ) def process_video_obj(self, video_obj): video_id = video_obj.get("data", {}).get("id", 0) video_title = clean_title(video_obj.get("data", {}).get("title", "no title")) video_time = video_obj['data']['duration'] publish_time_stamp = int(video_obj['data']['clusterTime']) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) user_name = video_obj['data']['source'] video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": int(video_obj['data'].get("playbackCount", 0)), "like_cnt": int(video_obj.get("likeCount", 0)), "comment_cnt": int(video_obj.get("commentCounts", 0)), "share_cnt": 0, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, "session": f"youlegaoxiaoxiaoshipin-{int(time.time())}", } flag = PiaoQuanPipeline( platform=self.crawler, mode=self.log_type, rule_dict=self.rule_dict, env=self.env, item=video_dict ) if flag: video_dict["out_user_id"] = video_obj['data'].get("ownerId", 0) video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = str(video_dict["video_id"]) video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = "http:" + video_obj['data']['url'] video_dict["avatar_url"] = "http:" + video_obj['data']['avatar'] video_dict["cover_url"] = "http:" + video_obj['data']['thumbUrl'] print(json.dumps(video_dict, ensure_ascii=False, indent=4)) self.download_count += 1 # self.mq.send_msg(video_dict) if __name__ == "__main__": ZL = YLGXXSPScheduling( log_type="recommend", crawler="ylgxxsp", rule_dict={}, our_uid="luojunhuihaoshuai", env="prod" ) for i in range(5): ZL.get_videoList(page_id=i + 1) print(ZL.download_count)