123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import configparser
- import re
- import os
- import sys
- import requests
- import json
- from urllib.parse import urlparse
- sys.path.append(os.getcwd())
- from common import Oss, Common
- from common.pq_utility import PQ
- from common.url_manage import urlManage
- config = configparser.ConfigParser()
- config.read('/root/single_video_crawler/config.ini') # 替换为您的配置文件路径
- # config.read('/Users/tzld/Desktop/single_video_crawler/config.ini')
- class gongzhonghaoVdieo():
- @classmethod
- def download_video(cls, video_url, video_path_url):
- for i in range(3):
- headers = {
- 'Accept': '*/*',
- 'Accept-Encoding': 'identity;q=1, *;q=0',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Cache-Control': 'no-cache',
- 'Connection': 'keep-alive',
- 'Host': urlparse(video_url).netloc,
- 'Origin': 'https://mp.weixin.qq.com',
- 'Pragma': 'no-cache',
- 'Range': 'bytes=0-',
- 'Referer': 'https://mp.weixin.qq.com/',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
- }
- response = requests.request("GET", video_url, headers=headers)
- # 检查响应状态码是否为200
- if response.status_code == 206:
- # 以二进制写入模式打开文件
- with open(f"{video_path_url}", "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- return True
- return False
- @classmethod
- def get_link(cls, video_id):
- url = "https://h5vv.video.qq.com/getinfo?vid={}&platform=101001&charge=0&otype=json&defn=shd".format(
- video_id
- )
- headers = {
- "Host": "h5vv.video.qq.com",
- "xweb_xhr": "1",
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
- "Content-Type": "application/x-www-form-urlencoded",
- "Accept": "*/*",
- "Sec-Fetch-Site": "cross-site",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Dest": "empty",
- "Referer": "https://servicewechat.com/wx5fcd817f3f80aece/3/page-frame.html",
- "Accept-Language": "en",
- }
- response = requests.get(url, headers=headers)
- result = json.loads(response.text.replace("QZOutputJson=", "")[:-1])
- vl = result["vl"]["vi"][0]
- key = vl["fvkey"]
- name = vl["fn"]
- folder = vl["ul"]["ui"][0]["url"]
- video_url = folder + name + "?vkey=" + key
- return video_url
- @classmethod
- def get_js(cls, link):
- payload = {}
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
- }
- response = requests.request("GET", link, headers=headers, data=payload)
- js_code = response.content.decode()
- return js_code
- @classmethod
- def get_url(cls, js_code):
- pattern = re.compile(r"url: \('(.*?)'\)")
- urls = pattern.findall(js_code)
- if urls:
- return urls[0]
- else:
- match = re.search(r'target_url\s*:\s*"(.*?)"', js_code)
- # 提取匹配到的 URL
- url = match.group(1) if match else None
- return url
- @classmethod
- def get_videoList(cls, vx_message, channel):
- try:
- mp4_link = ''
- video_id = ''
- data_link = vx_message[1]
- data = json.loads(data_link)
- url = data.get('url', '')
- title = data.get('title', '')
- for i in range(3):
- js_code = cls.get_js(url)
- regex = r"video_id:\s*'([^']*)'"
- match = re.search(regex, js_code)
- video_id = match.group(1) if match else None
- if video_id:
- mp4_link = cls.get_link(video_id)
- if mp4_link:
- break
- else:
- video_id = urlManage.random_id()
- mp4_link = cls.get_url(js_code)
- mp4_link = mp4_link.replace("\\x26amp;", "&")
- if mp4_link:
- break
- if mp4_link:
- video_path_url = config['PATHS']['VIDEO_OSS_PATH'] + video_id + ".mp4"
- status = cls.download_video(mp4_link, video_path_url)
- if status == False:
- return "视频下载失败"
- oss_object_key = Oss.video_sync_upload_oss(video_path_url, video_id)
- oss_object_key.get("status")
- # 获取 oss 视频地址
- oss_object_key = oss_object_key.get("oss_object_key")
- Common.logger().info(f'准备发送站内参数:{oss_object_key},{title},{vx_message[3]}')
- piaoquantv = PQ.insert_piaoquantv(oss_object_key, title, vx_message[3])
- if piaoquantv == False:
- return "视频发送到站内失败"
- if os.path.isfile(video_path_url):
- os.remove(video_path_url)
- else:
- return "无法获取到视频下载链接"
- except Exception as e:
- Common.logger().info(f'报错信息:{e}')
- return f"处理报错,报错信息{e}"
|