123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- import html
- import json
- import os
- import random
- import re
- import time
- import uuid
- import requests
- from datetime import datetime
- from urllib.parse import urlparse, parse_qs
- from loguru import logger
- from utils.aliyun_log import AliyunLogger
- from utils.feishu_utils import Feishu
- class Dy_KS:
- @classmethod
- def get_text_dy_video(cls,url):
- max_retries = 3
- retry_count = 0
- while retry_count < max_retries:
- try:
- if "&vid=" in url:
- parsed_url = urlparse(url)
- params = parse_qs(parsed_url.query)
- video_id = params.get('vid', [None])[0]
- elif "?modal_id=" in url:
- parsed_url = urlparse(url)
- params = parse_qs(parsed_url.query)
- video_id = params.get('modal_id', [None])[0]
- else:
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;'
- 'q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Cache-Control': 'no-cache',
- 'Pragma': 'no-cache',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/127.0.0.0 Safari/537.36',
- }
- response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout=30)
- location = response.headers.get('Location', None)
- match = re.search(r'/video/(\d+)/?', location.split('?')[0] if location else url)
- if match:
- video_id = match.group(1)
- elif "&vid=" in location:
- video_id = re.search(r'vid=(\d+)', location).group(1)
- url = "http://8.217.192.46:8889/crawler/dou_yin/detail"
- if not video_id or not video_id.strip():
- return None, None, None
- payload = json.dumps({
- "content_id": str(video_id)
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload, timeout= 60)
- response = response.json()
- code = response["code"]
- if code == 0:
- data = response["data"]["data"]
- video_url = data["video_url_list"][0]["video_url"]
- original_title = data["title"]
- return video_url, original_title, video_id
- if code == 22002:
- if '抖音内容已被删除或无法访问' in response['msg']:
- return "作品不存在", None, None
- except Exception as e:
- retry_count += 1
- logger.error(f"[+] 抖音{url}获取视频链接失败,失败信息{e}")
- time.sleep(1)
- return None, None, None
- @classmethod
- def get_text_ks_video(cls,url):
- try:
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;'
- 'q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Cache-Control': 'no-cache',
- 'Pragma': 'no-cache',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/127.0.0.0 Safari/537.36',
- }
- response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout= 30)
- location = response.headers.get('Location', None)
- if location == "https://kuaishou.com/":
- return "作品不存在", None, None
- # video_id = re.search(r'/(f|photo|short-video|long-video)/(.*)/?',
- # location.split('?')[0] if location else url).group(2)
- match = re.search(r'/(f|photo|short-video|long-video)/(.*)/?',
- location.split('?')[0] if location else url)
- if match:
- video_id = match.group(2)
- else:
- parts = url.rstrip('/').split('/')
- if parts:
- video_id = parts[-1]
- logger.info(f"[+]提取到的视频ID=={video_id}")
- url = "http://8.217.192.46:8889/crawler/kuai_shou/detail"
- if not video_id or not video_id.strip():
- return None, None, None
- payload = json.dumps({
- "content_id": str(video_id)
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- time.sleep(random.uniform(10, 50))
- response = requests.request("POST", url, headers=headers, data=payload, timeout= 30)
- response = response.json()
- code = response["code"]
- if code == 0:
- data = response["data"]["data"]
- content_type = data['content_type']
- if content_type == 'note':
- return "note","note"
- video_url = data["video_url_list"][0]["video_url"]
- original_title = data["title"]
- return video_url, original_title, video_id
- elif code == 27006:
- if "作品不存在" in response['msg'] or "内容不存在" in response['msg'] or "私密作品" in response['msg'] or "该作品仅允许关注者查看" in response['msg']:
- return "作品不存在", None, None
- time.sleep(3)
- except Exception as e:
- logger.error(f"[+] 快手{url}获取视频链接失败,失败信息{e}")
- return None, None,None
- @classmethod
- def get_text_hksp_video(cls, url):
- try:
- parsed_url = urlparse(url)
- query_params = parse_qs(parsed_url.query)
- video_id = query_params.get('vid')[0]
- req_url = "http://8.217.192.46:8889/crawler/hao_kan_shi_pin/detail"
- if not video_id or not video_id.strip():
- return None, None, None
- payload = json.dumps({
- "content_id": str(video_id)
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- max_retries = 3
- retries = 0
- while retries < max_retries:
- try:
- response = requests.request("POST", req_url, headers=headers, data=payload, timeout=30)
- response = response.json()
- code = response["code"]
- if code == 0:
- data = response["data"]["data"]
- content_type = data['content_type']
- if content_type == 'note':
- return "note", "note", "note"
- video_url = data["video_url_list"][0]["video_url"]
- original_title = data["title"]
- return video_url, original_title, video_id
- else:
- retries += 1
- logger.warning(f"[+] 好看视频 {url} 请求返回 code 为 {code},正在进行第 {retries} 次重试...")
- except Exception as e:
- retries += 1
- logger.warning(f"[+] 好看视频 {url} 请求接口异常,正在进行第 {retries} 次重试...")
- except Exception as e:
- logger.error(f"[+] 好看视频{url}获取视频信息失败,失败信息{e}")
- return None, None, None
- @classmethod
- def get_video_url(cls, data, principal):
- try:
- url = data['video_url']
- logger.info(f"[+] url=={url}")
- if "&vid=" in url or "?modal_id=" in url or "?vid=" in url:
- host = urlparse(url).netloc
- logger.info(f"[+] host=={host}")
- else:
- # msg = html.unescape(url).split('?')[0]
- # pattern = re.search(r'https?://[^\s<>"\'\u4e00-\u9fff]+', msg)
- msg = html.unescape(url)
- pattern = re.search(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+(/[-\w._~:/#[\]@!$&()*+,;=]*)', msg)
- logger.info(f"[+] pattern == {pattern}")
- # if pattern is None:
- # logger.error(f"[+] {url} 提取 url失败")
- # return "重新处理",None,None,None
- url = pattern.group()
- host = urlparse(url).netloc
- logger.info(f"[+] url == {url} host=={host}")
- if host in ['v.douyin.com', 'www.douyin.com', 'www.iesdouyin.com']:
- tag_transport_channel = "抖音"
- logger.info(f"[+] {url}开始获取抖音视频链接")
- url, original_title, video_id = cls.get_text_dy_video(url=url)
- elif host in ['v.kuaishou.com', 'www.kuaishou.com', 'v.m.chenzhongtech.com', 'creater.eozatvmq.com','live.kuaishou.com']:
- tag_transport_channel = "快手"
- logger.info(f"[+] {url}开始获取快手视频链接")
- url, original_title, video_id = cls.get_text_ks_video(url=url)
- elif host in ['haokan.baidu.com']:
- tag_transport_channel = "好看视频"
- logger.info(f"[+] {url}开始获取好看视频链接")
- url, original_title, video_id = cls.get_text_hksp_video(url=url)
- else:
- logger.error(f"[+] {url}该链接不是抖/快/好看视频 不做处理")
- AliyunLogger.logging(data["name"], principal, "", data["video_url"],
- "不是抖/快/好看视频 不做处理", "1001", str(data))
- return "链接不是抖/快/好看",None,None,None
- if url == "作品不存在":
- return "作品不存在",None,None,None
- return url, original_title, video_id, tag_transport_channel
- except Exception as e:
- logger.info(f"[+] 获取视频链接异常{e}")
- return "重新处理",None,None,None
|