import json import random import re import time import uuid import requests from common.common import Common from common.scheduling_db import MysqlHelper from common.mq import MQ from common.aliyun_log import AliyunLogger from common.pipeline import PiaoQuanPipeline from common.public import download_rule, get_config_from_mysql proxies = {"http": None, "https": None} class FqwRecommend: platform = "福气旺" download_cnt = 0 element_list = [] i = 0 @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and create_time>='2023-06-26' and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env): mq = MQ(topic_name="topic_crawler_etl_" + env) # uuid1 = str(uuid.uuid1()) page = 1 while page <= 20: try: Common.logger(log_type, crawler).info(f"正在抓取第{page}页") Common.logging(log_type, crawler, env, f"正在抓取第{page}页") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"正在抓取第{page}页" ) url = "https://api.xinghetime.com/luckvideo/video/getRecommendVideos" payload = json.dumps({ "baseParam": { "mid": "openid_o5Xjp4q2yKGEhqFEeYvwjPNZJzWY", "pageSource": "video-home" }, "bizParam": { "pageSize": 3 } }) headers = { 'Host': 'api.xinghetime.com', 'accept': '*/*', 'content-type': 'application/json', 'accept-language': 'zh-cn', 'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac', 'referer': 'https://servicewechat.com/wx8d8992849398a1cb/10/page-frame.html' } r = requests.post(url=url, headers=headers, data=payload, proxies=proxies, verify=False) if "data" not in r.text or r.status_code != 200: Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.text}\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"get_videoList:{r.text}\n" ) return elif "data" not in r.json(): Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.json()}\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"get_videoList:{r.json()}\n" ) return elif len(r.json()["data"]) == 0: Common.logger(log_type, crawler).warning(f"get_videoList:{r.json()['data']['list']}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.json()['data']['list']}\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"get_videoList:{r.json()['data']['list']}\n" ) return else: # 视频列表 feeds = r.json()["data"] for i in range(len(feeds)): try: if cls.download_cnt >= int(rule_dict.get("videos_cnt", {}).get("min", 10)): cls.i = 0 cls.download_cnt = 0 cls.element_list = [] return cls.i += 1 trace_id = crawler + str(uuid.uuid1()) AliyunLogger.logging( code="1001", platform=crawler, mode=log_type, env=env, message="扫描到一条视频", trace_id=trace_id ) video_title = feeds[i].get("title", "").strip().replace("\n", "") \ .replace("/", "").replace("\\", "").replace("\r", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace("&NBSP", "").replace(".", "。").replace(" ", "") \ .replace("'", "").replace("#", "").replace("Merge", "") publish_time_stamp = int(time.time()) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 播放次数 提取数字 play_count = feeds[i]["playCountFormat"] number = re.findall(r'\d+', play_count) if number: result = number[0] else: result = 0 time_str = feeds[i]["durationFormat"] minutes, seconds = map(int, time_str.split(':')) # 计算总秒数 total_seconds = minutes * 60 + seconds video_dict = { "video_title": video_title, "video_id": str(feeds[i]["videoId"]), # 视频id "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "update_time_stamp": int(time.time()), "category_id": int(feeds[i].get("category_id", 0)), # 视频来源(精彩推荐) "cover_url": feeds[i].get("coverImagePath", ""), # 视频封面 "video_url": feeds[i]["videoPath"], # 视频链接 # "duration_format": feeds[i]["durationFormat"], # 时长 "click": int(feeds[i].get("click", 0)), # 点击数 "video_width": int(feeds[i].get("width", 0)), "video_height": int(feeds[i].get("height", 0)), "user_name": feeds[i].get("source", "").strip().replace("\n", ""), "user_id": feeds[i].get("openid", ""), "play_cnt": int(result), "like_cnt": 0, "comment_cnt": 0, "share_cnt": 0, "duration": total_seconds, "session": "" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"video_dict:{video_dict}") video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = our_uid video_dict["publish_time"] = video_dict["publish_time_str"] if video_dict["video_id"] == "" or video_dict["video_title"] == "" or video_dict[ "video_url"] == "": Common.logger(log_type, crawler).info("无效视频\n") Common.logging(log_type, crawler, env, "无效视频\n") AliyunLogger.logging( code="2005", platform=crawler, mode=log_type, env=env, message="无效视频", data=video_dict, trace_id=trace_id ) continue pipeline = PiaoQuanPipeline( platform=crawler, mode=log_type, trace_id=trace_id, item=video_dict, rule_dict=rule_dict, env=env ) if pipeline.process_item(): mq.send_msg(video_dict) AliyunLogger.logging( code="1002", platform=crawler, mode=log_type, env=env, trace_id=trace_id, data=video_dict, message="成功发送至ETL" ) cls.download_cnt += 1 interval = random.randrange(5, 11) time.sleep(interval) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") AliyunLogger.logging( code="3000", platform=crawler, mode=log_type, env=env, message=f"抓取单条视频异常:{e}\n" ) page += 1 except Exception as e: Common.logger(log_type, crawler).error(f"抓取第{page}页时异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取第{page}页时异常:{e}\n") AliyunLogger.logging( code="3000", platform=crawler, mode=log_type, env=env, message=f"抓取第{page}页时异常:{e}\n" ) if __name__ == "__main__": rule_dict1 = {"period": {"min": 365, "max": 365}, "duration": {"min": 30, "max": 1800}, "favorite_cnt": {"min": 0, "max": 0}, "videos_cnt": {"min": 10, "max": 20}, "share_cnt": {"min": 0, "max": 0}} FqwRecommend.get_videoList("recommend", "wangqifu,", "16QspO", rule_dict1, 'dev')