123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import configparser
- import re
- import os
- import sys
- import requests
- import json
- import urllib.parse
- from urllib.parse import urlparse
- from common import Oss
- from common.pq_utility import PQ
- from common.url_manage import urlManage
- from common.userAgent import get_random_user_agent
- sys.path.append(os.getcwd())
- config = configparser.ConfigParser()
- config.read('/Users/tzld/Desktop/single_video_crawler/config.ini') # 替换为您的配置文件路径
- class gongzhonghaoVdieo():
- @classmethod
- def download_video(cls, video_url, video_path_url):
- for i in range(3):
- headers = {
- 'Accept': '*/*',
- 'Accept-Encoding': 'identity;q=1, *;q=0',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Cache-Control': 'no-cache',
- 'Connection': 'keep-alive',
- 'Host': urlparse(video_url).netloc,
- 'Origin': 'https://mp.weixin.qq.com',
- 'Pragma': 'no-cache',
- 'Range': 'bytes=0-',
- 'Referer': 'https://mp.weixin.qq.com/',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
- }
- response = requests.request("GET", video_url, headers=headers)
- # 检查响应状态码是否为200
- if response.status_code == 206:
- # 以二进制写入模式打开文件
- with open(f"{video_path_url}", "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- return True
- return False
- @classmethod
- def get_url(cls, link):
- payload = {}
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
- }
- response = requests.request("GET", link, headers=headers, data=payload)
- js_code = response.content.decode()
- if js_code:
- pattern = re.compile(r"url: \('(.*?)'\)")
- urls = pattern.findall(js_code)
- if urls:
- return urls[0]
- else:
- return None
- else:
- return None
- @classmethod
- def get_videoList(cls, vx_message, channel):
- try:
- data_link = vx_message[1]
- data = json.loads(data_link)
- url = data.get('url', '')
- mp4_link = cls.get_url(url)
- mp4_link = mp4_link.replace("\\x26amp;", "&")
- if mp4_link:
- title = data.get('title', '')
- # 随机生成视频oss_id
- video_id = urlManage.random_id()
- video_path_url = config['PATHS']['VIDEO_OSS_PATH'] + video_id + ".mp4"
- status = cls.download_video(mp4_link, video_path_url)
- if status == False:
- return "视频下载失败"
- oss_object_key = Oss.video_sync_upload_oss(video_path_url, video_id)
- status = oss_object_key.get("status")
- if status != 200:
- return "发送OSS失败"
- # 获取 oss 视频地址
- oss_object_key = oss_object_key.get("oss_object_key")
- piaoquantv = PQ.insert_piaoquantv(oss_object_key, title, vx_message[3])
- if piaoquantv == False:
- return "发送账号失败"
- if os.path.isfile(video_path_url):
- os.remove(video_path_url)
- return
- else:
- return "无法获取视频链接"
- except Exception:
- return "URL处理失败"
|