# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/10/10 import json import os import random import sys import time import requests from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql, download_rule proxies = {"http": None, "https": None} def clean_title(strings): return ( strings.strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") ) class ZLNYLScheduling: def __init__(self, log_type, crawler, category, rule_dict, env): self.platform = "中老年娱乐" self.log_type = log_type self.crawler = crawler self.category = category self.rule_dict = rule_dict self.env = env self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) def repeat_video(self, video_id): sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values( self.log_type, self.crawler, sql, self.env ) return len(repeat_video) # 获取视频id_list def get_videoList(self, page_id): url = "https://kkj.xinhuachuanmeijs.com/app/index.php?i=299&t=0&m=jyt_txvideo&v=1.0&from=wxapp&c=entry&a=wxapp&do=videolist&" headers = { "Host": "kkj.xinhuachuanmeijs.com", "accept": "*/*", "content-type": "application/x-www-form-urlencoded", "accept-language": "zh-cn", "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac", "referer": "https://servicewechat.com/wx546222d9b2fe5fc0/3/page-frame.html", "Cookie": "PHPSESSID=ef4e78382296a0db2021ecd6e35c614f", } payload = "category={}&page={}&israndom=1&type=4&noauth=true".format( self.category, page_id ) response = requests.request("POST", url, headers=headers, data=payload) if "data" not in response.text or response.status_code != 200: Common.logger(self.log_type, self.crawler).info( f"get_videoList:{response.text}\n" ) Common.logging( self.log_type, self.crawler, self.env, f"get_videoList:{response.text}\n", ) return elif len(response.json()["data"]) == 0: Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n") Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n") return else: data_list = response.json()["data"] for video_obj in data_list: try: video_id = video_obj.get("vid", 0) video_title = clean_title(video_obj.get("vtitle", 0)) video_time = video_obj.get("v_time", 0) publish_time_stamp = int(time.time()) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) user_name = "" video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": 0, "like_cnt": 0, "comment_cnt": 0, "share_cnt": 0, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, "cover_url": "", "session": f"zhonglaonianyule-{int(time.time())}", } for k, v in video_dict.items(): Common.logger(self.log_type, self.crawler).info(f"{k}:{v}") Common.logging( self.log_type, self.crawler, self.env, f"{video_dict}" ) # 过滤无效视频 if video_title == "" or video_dict["video_id"] == "": Common.logger(self.log_type, self.crawler).info("无效视频\n") Common.logging(self.log_type, self.crawler, self.env, "无效视频\n") # 抓取基础规则过滤 elif ( download_rule( log_type=self.log_type, crawler=self.crawler, video_dict=video_dict, rule_dict=self.rule_dict, ) is False ): Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n") Common.logging( self.log_type, self.crawler, self.env, "不满足抓取规则\n" ) elif ( any( str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql( log_type=self.log_type, source=self.crawler, env=self.env, text="filter", action="", ) ) is True ): Common.logger(self.log_type, self.crawler).info("已中过滤词\n") Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n") elif self.repeat_video(video_dict["video_id"]) != 0: Common.logger(self.log_type, self.crawler).info("视频已下载\n") Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n") else: video_dict["out_user_id"] = video_dict["profile_id"] video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = "" video_dict["publish_time"] = video_dict["publish_time_str"] d_obj = self.find_video_url(video_id) video_dict["video_url"] = d_obj["url"] video_dict["avatar_url"] = d_obj["cover"] print(json.dumps(video_dict, ensure_ascii=False, indent=4)) # self.mq.send_msg(video_dict) except Exception as e: Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") Common.logging( self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n" ) def find_video_url(self, video_id): url = "https://kkj.xinhuachuanmeijs.com/app/index.php?i=299&t=0&m=jyt_txvideo&v=1.0&from=wxapp&c=entry&a=wxapp&do=videoinfo&state=we7sid-f0008a08276fc324921185dc74427c56&sign=fa36387242169f01aa747a80d49c8670&vid={}&version=1.0.3".format( video_id ) headers = { "Host": "kkj.xinhuachuanmeijs.com", "xweb_xhr": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817", "content-type": "application/x-www-form-urlencoded", "accept": "*/*", "sec-fetch-site": "cross-site", "sec-fetch-mode": "cors", "sec-fetch-dest": "empty", "referer": "https://servicewechat.com/wx546222d9b2fe5fc0/3/page-frame.html", "accept-language": "en", } response = requests.get(url, headers=headers).json() video_url = response["data"]["res"] video_cover = response["data"]["cover"] Common.logger(self.log_type, self.crawler).info( "{}成功抓取视频链接\n".format(response["data"]["vtitle"]) ) Common.logging( self.log_type, self.crawler, self.env, "{}成功抓取视频链接\n".format(response["data"]["vtitle"]), ) time.sleep(random.randint(3, 5)) return {"url": video_url, "cover": video_cover} if __name__ == "__main__": ZL = ZLNYLScheduling( log_type="recommend", crawler="zlnyl", category=3615, rule_dict={}, env="dev" ) for i in range(4): ZL.get_videoList(i + 1)