# -*- coding: utf-8 -*- # @Author: zhangyong # @Time: 2023/12/04 import json import os import random import sys import time import requests from hashlib import md5 from datetime import datetime from common import get_redirect_url from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql, download_rule proxies = {"http": None, "https": None} def clean_title(strings): return ( strings.strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") ) class Jxzfwncdhyspcheduling: def __init__(self, log_type, crawler, rule_dict, env, our_uid): self.platform = "吉祥祝福为你传递好运" self.log_type = log_type self.crawler = crawler self.rule_dict = rule_dict self.env = env self.our_uid = our_uid self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 def repeat_video(self, video_id): sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values( self.log_type, self.crawler, sql, self.env ) return len(repeat_video) # 获取视频id_list def get_videoList(self): for i in range(1, 10): time.sleep(random.randint(5, 10)) url = "https://api.xiahong.top/index.php?s=mobile/Home/getHomeList&page={}&appid=wx7457ce7bf3cdbdbf&version=1.9.1&env_version=release&scene=1008".format(i) headers = { 'Host': 'api.xiahong.top', 'Content-Type': 'application/json', 'Accept-Language': 'zh-cn', 'Accept': '*/*', 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac', 'Referer': 'https://servicewechat.com/wx49e7ec4c849fb4e2/2/page-frame.html', 'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3MDI0MzU3NjUsIm5iZiI6MTcwMjQzNTc2NSwiZXhwIjoxNzAyNDQyOTY1LCJkYXRhIjp7InVzZXJfaWQiOiIyNDQ3NzI2NzMifX0.yqz9rShAXYSGEonLtCu5h8geosw7dEO-2unxvmmugZE', 'ik': 'b326b5062b2f0e69046810717534cb09' } response = requests.post(url, headers=headers) if "data" not in response.text or response.status_code != 200: Common.logger(self.log_type, self.crawler).info( f"get_videoList:{response.text}\n" ) Common.logging( self.log_type, self.crawler, self.env, f"get_videoList:{response.text}\n", ) return elif len(response.json()["data"]["list"]) == 0: Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n") Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n") return else: data_list = response.json()["data"]["list"] for video_obj in data_list: try: self.process_video_obj(video_obj) except Exception as e: Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") Common.logging( self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n" ) def get_video_list(self, video_id): url = "https://api.xiahong.top/index.php?s=mobile/Video/getVideoInfo&vid={}&appid=wx7457ce7bf3cdbdbf&version=1.9.1&scene=1089".format( video_id) headers = { 'Host': 'api.xiahong.top', 'Content-Type': 'application/json', 'Accept-Language': 'zh-cn', 'Accept': '*/*', 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac', 'Referer': 'https://servicewechat.com/wx7457ce7bf3cdbdbf/2/page-frame.html', 'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3MDE4NDA2MzcsIm5iZiI6MTcwMTg0MDYzNywiZXhwIjoxNzAxODQ3ODM3LCJkYXRhIjp7InVzZXJfaWQiOjIzNjM5MTAxMH19.WYV52kw3Oi4RT-KAblVCFYXWO3RJAQH9x6hB2tWKMKc', 'ik': 'b326b5062b2f0e69046810717534cb09' } response = requests.post(url, headers=headers) if "data" not in response.text or response.status_code != 200: Common.logger(self.log_type, self.crawler).info( f"get_videoList:{response.text}\n" ) Common.logging( self.log_type, self.crawler, self.env, f"get_videoList:{response.text}\n", ) return elif len(response.json()["data"]["video_info"]) == 0: Common.logger(self.log_type, self.crawler).info(f"详情页数据为空~\n") Common.logging(self.log_type, self.crawler, self.env, f"详情页数据为空~\n") return else: data_list = response.json()["data"]["video_info"] return data_list def process_video_obj(self, video_obj): video_id = video_obj.get("id", 0) get_video_list = self.get_video_list(video_id) video_title = clean_title(get_video_list.get("title", "no title")) video_time = video_obj.get("v_time", 0) publish_time_stamp = int(time.time()) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) user_name = "" video_dict = { "video_title": video_title, "video_id": video_id, "duration": video_time, "play_cnt": video_obj.get("visited", 0), "like_cnt": 0, "comment_cnt": 0, "share_cnt": video_obj.get("shared", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": 0, "video_height": 0, "profile_id": 0, "profile_mid": 0, # "cover_url": "", "session": f"zhufuzanmenzhonglaonianzhishipin-{int(time.time())}", } for k, v in video_dict.items(): Common.logger(self.log_type, self.crawler).info(f"{k}:{v}") Common.logging( self.log_type, self.crawler, self.env, f"{video_dict}" ) # 过滤无效视频 if video_title == "" or video_dict["video_id"] == "": Common.logger(self.log_type, self.crawler).info("无效视频\n") Common.logging(self.log_type, self.crawler, self.env, "无效视频\n") # 抓取基础规则过滤 elif ( download_rule( log_type=self.log_type, crawler=self.crawler, video_dict=video_dict, rule_dict=self.rule_dict, ) is False ): Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n") Common.logging( self.log_type, self.crawler, self.env, "不满足抓取规则\n" ) elif ( any( str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql( log_type=self.log_type, source=self.crawler, env=self.env, text="filter", action="", ) ) is True ): Common.logger(self.log_type, self.crawler).info("已中过滤词\n") Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n") elif self.repeat_video(video_dict["video_id"]) != 0: Common.logger(self.log_type, self.crawler).info("视频已下载\n") Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n") else: video_url = get_video_list['video_url'] video_url = get_redirect_url(video_url) video_dict["out_user_id"] = video_dict["profile_id"] video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = str(video_dict["video_id"]) video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = video_url video_dict["avatar_url"] = video_obj['images'] video_dict["cover_url"] = video_obj['images'] self.download_count += 1 self.mq.send_msg(video_dict) if __name__ == "__main__": ZL = Jxzfwncdhyspcheduling( log_type="recommend", crawler="Jxzfwncdhy", rule_dict={}, our_uid="zhangyong", env="dev" ) ZL.get_videoList()