import configparser import re import os import sys import requests import json import urllib.parse from urllib.parse import urlparse from common import Oss from common.pq_utility import PQ from common.url_manage import urlManage from common.userAgent import get_random_user_agent sys.path.append(os.getcwd()) config = configparser.ConfigParser() config.read('/root/single_video_crawler/config.ini') # 替换为您的配置文件路径 class gongzhonghaoVdieo(): @classmethod def download_video(cls, video_url, video_path_url): for i in range(3): headers = { 'Accept': '*/*', 'Accept-Encoding': 'identity;q=1, *;q=0', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Host': urlparse(video_url).netloc, 'Origin': 'https://mp.weixin.qq.com', 'Pragma': 'no-cache', 'Range': 'bytes=0-', 'Referer': 'https://mp.weixin.qq.com/', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36', } response = requests.request("GET", video_url, headers=headers) # 检查响应状态码是否为200 if response.status_code == 206: # 以二进制写入模式打开文件 with open(f"{video_path_url}", "wb") as file: # 将响应内容写入文件 file.write(response.content) return True return False @classmethod def get_url(cls, link): payload = {} headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36', } response = requests.request("GET", link, headers=headers, data=payload) js_code = response.content.decode() if js_code: pattern = re.compile(r"url: \('(.*?)'\)") urls = pattern.findall(js_code) if urls: return urls[0] else: return None else: return None @classmethod def get_videoList(cls, vx_message, channel): try: data_link = vx_message[1] data = json.loads(data_link) url = data.get('url', '') mp4_link = cls.get_url(url) mp4_link = mp4_link.replace("\\x26amp;", "&") if mp4_link: title = data.get('title', '') # 随机生成视频oss_id video_id = urlManage.random_id() video_path_url = config['PATHS']['VIDEO_OSS_PATH'] + video_id + ".mp4" status = cls.download_video(mp4_link, video_path_url) if status == False: return "视频下载失败" oss_object_key = Oss.video_sync_upload_oss(video_path_url, video_id) status = oss_object_key.get("status") if status != 200: return "发送OSS失败" # 获取 oss 视频地址 oss_object_key = oss_object_key.get("oss_object_key") piaoquantv = PQ.insert_piaoquantv(oss_object_key, title, vx_message[3]) if piaoquantv == False: return "发送账号失败" if os.path.isfile(video_path_url): os.remove(video_path_url) return else: return "无法获取视频链接" except Exception: return "URL处理失败"