# -*- coding: utf-8 -*- # @Time: 2023/11/03 import json import os import random import sys import time import requests import urllib3 sys.path.append(os.getcwd()) from common.mq import MQ from common.common import Common from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql, download_rule proxies = {"http": None, "https": None} class KanyikanRecommend: platform = "看一看" strategy = "feed流" @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and create_time>='2023-10-09' and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def get_vid(cls,session, log_type, crawler, env, our_uid, rule_dict): url = 'https://search.weixin.qq.com/cgi-bin/recwxa/recwxavideolist?' header = { 'Host': 'search.weixin.qq.com', 'Content-Type': 'application/json', 'X-WX-ClientVersion': '0x33050520', 'X-WECHAT-UIN': 'b2hfbTQ1WGNjSzQxemdfanpMSml1TEtfbEtsVQ==', 'Accept': '*/*', 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac', 'Referer': 'https://servicewechat.com/wxbb9a805eb4f9533c/268/page-frame.html', 'Accept-Language': 'zh-cn' } params = { "session": session, "offset": 0, "count": "10", "channelid": "200", "scene": '310', "subscene": '1074', "sharesearchid": '0', "nettype": 'wifi', "switchprofile": "0", "switchnewuser": "0", "ad": 0 } urllib3.disable_warnings() response = requests.get(url=url, headers=header, params=params, proxies=proxies, verify=False) if "data" not in response.text: Common.logger(log_type, crawler).info("获取视频list时,session过期,随机睡眠 31-50 秒") Common.logging(log_type, crawler, env, "获取视频list时,session过期,随机睡眠 31-50 秒") # 如果返回空信息,则随机睡眠 31-40 秒 time.sleep(random.randint(31, 40)) cls.get_videoList(log_type, crawler, our_uid, rule_dict, env) elif "items" not in response.json()["data"]: Common.logger(log_type, crawler).info(f"get_feeds:{response.json()},随机睡眠 1-3 分钟") Common.logging(log_type, crawler, env, f"get_feeds:{response.json()},随机睡眠 1-3 分钟") # 如果返回空信息,则随机睡眠 1-3 分钟 time.sleep(random.randint(60, 180)) cls.get_videoList(log_type, crawler, our_uid, rule_dict, env) feeds = response.json().get("data", {}).get("items", "") return feeds @classmethod def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env): mq = MQ(topic_name="topic_crawler_etl_" + env) try: Common.logger(log_type, crawler).info(f"正在抓取列表页") Common.logging(log_type, crawler, env, f"正在抓取列表页") session = Common.get_session(log_type, crawler, env) if session is None: time.sleep(1) cls.get_videoList(log_type, crawler, our_uid, rule_dict, env) feeds = cls.get_vid(session,log_type, crawler, env, our_uid, rule_dict) if feeds == "": Common.logger(log_type, crawler).info(f"feeds:{feeds}") Common.logging(log_type, crawler, env, f"feeds:{feeds}") return for i in range(len(feeds)): vid = feeds[i].get("videoId", "") shared_cnt = int(feeds[i].get("shared_cnt", 0)) playCount = int(feeds[i].get("playCount", 0)) video_percent = '%.2f' % (shared_cnt / playCount) if playCount < 100000: if float(video_percent) < 0.05 and playCount < 15000: Common.logger(log_type, crawler).info(f"分享/播放:{video_percent},播放量:{playCount}\n") Common.logging(log_type, crawler, env, f"分享/播放:{video_percent},播放量:{playCount}\n") continue time.sleep(random.randint(4, 10)) url1 = "https://search.weixin.qq.com/cgi-bin/recwxa/recwxavideolist?" header = { 'Host': 'search.weixin.qq.com', 'Content-Type': 'application/json', 'X-WX-ClientVersion': '0x33050520', 'X-WECHAT-UIN': 'b2hfbTQ1WGNjSzQxemdfanpMSml1TEtfbEtsVQ==', 'Accept': '*/*', 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac', 'Referer': 'https://servicewechat.com/wxbb9a805eb4f9533c/269/page-frame.html', 'Accept-Language': 'zh-cn' } params = { "session": session, "offset": 0, "count": "30", "channelid": "200201", "vid": vid, "scene": "310", "subscene": '1089' } urllib3.disable_warnings() response = requests.get(url=url1, headers=header, params=params, proxies=proxies, verify=False) if "data" not in response.text: Common.logger(log_type, crawler).info("获取视频list时,session过期,随机睡眠 31-50 秒") Common.logging(log_type, crawler, env, "获取视频list时,session过期,随机睡眠 31-50 秒") # 如果返回空信息,则随机睡眠 31-40 秒 time.sleep(random.randint(31, 40)) cls.get_videoList(log_type, crawler, our_uid, rule_dict, env) elif "items" not in response.json()["data"]: Common.logger(log_type, crawler).info(f"get_feeds:{response.json()},随机睡眠 1-3 分钟") Common.logging(log_type, crawler, env, f"get_feeds:{response.json()},随机睡眠 1-3 分钟") # 如果返回空信息,则随机睡眠 1-3 分钟 time.sleep(random.randint(60, 180)) cls.get_videoList(log_type, crawler, our_uid, rule_dict, env) feeds = response.json().get("data", {}).get("items", "") if feeds == "": Common.logger(log_type, crawler).info(f"feeds:{feeds}") Common.logging(log_type, crawler, env, f"feeds:{feeds}") return for j in range(len(feeds)): try: video_title = feeds[j].get("title", "").strip().replace("\n", "") \ .replace("/", "").replace("\\", "").replace("\r", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace("&NBSP", "").replace(".", "。").replace(" ", "") \ .replace("'", "").replace("#", "").replace("Merge", "") publish_time_stamp = feeds[j].get("date", 0) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 获取播放地址 if "videoInfo" not in feeds[j]: video_url = "" elif "mpInfo" in feeds[j]["videoInfo"]["videoCdnInfo"]: if len(feeds[j]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"]) > 2: video_url = feeds[j]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][2]["url"] else: video_url = feeds[j]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][0]["url"] elif "ctnInfo" in feeds[j]["videoInfo"]["videoCdnInfo"]: video_url = feeds[j]["videoInfo"]["videoCdnInfo"]["ctnInfo"]["urlInfo"][0]["url"] else: video_url = feeds[j]["videoInfo"]["videoCdnInfo"]["urlInfo"][0]["url"] video_id = feeds[j].get("videoId", "")+"feed" s_cnt = int(feeds[j].get("shared_cnt", 0)) p_count = int(feeds[j].get("playCount", 0)) if p_count < 100000: if s_cnt < 200 and p_count < 15000: Common.logger(log_type, crawler).info(f"分享:{video_percent},播放量:{playCount}\n") Common.logging(log_type, crawler, env, f"分享:{video_percent},播放量:{playCount}\n") continue video_dict = { "video_title": video_title, "video_id": video_id, "play_cnt": feeds[j].get("playCount", 0), "like_cnt": feeds[j].get("liked_cnt", 0), "comment_cnt": feeds[j].get("comment_cnt", 0), "share_cnt": feeds[j].get("shared_cnt", 0), "duration": feeds[j].get("mediaDuration", 0), "video_width": feeds[j].get("short_video_info", {}).get("width", 0), "video_height": feeds[j].get("short_video_info", {}).get("height", 0), "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "user_name": feeds[j].get("source", "").strip().replace("\n", ""), "user_id": feeds[j].get("openid", ""), "avatar_url": feeds[j].get("bizIcon", ""), "cover_url": feeds[j].get("thumbUrl", ""), "video_url": video_url, "session": session, } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"video_dict:{video_dict}") if video_dict["video_id"] == "" or video_dict["video_title"] == "" or video_dict[ "video_url"] == "": Common.logger(log_type, crawler).info("无效视频\n") Common.logging(log_type, crawler, env, "无效视频\n") elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") Common.logging(log_type, crawler, env, "不满足抓取规则\n") elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') Common.logging(log_type, crawler, env, '已中过滤词\n') elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') Common.logging(log_type, crawler, env, '视频已下载\n') else: video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["strategy_type"] = "hcm" video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = our_uid video_dict["publish_time"] = video_dict["publish_time_str"] mq.send_msg(video_dict) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") except Exception as e: Common.logger(log_type, crawler).error(f"抓取列表页时异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取列表页时异常:{e}\n") if __name__ == "__main__": KanyikanRecommend.get_videoList( log_type="recommend", crawler="kanyikan", env="prod", rule_dict={'share_cnt': {'min': 300, 'max': 0}}, our_uid=64080779 )