from datetime import datetime from loguru import logger from common.aliyun_log import AliyunLogger from common.feishu_utils import Feishu from common.redis import get_top_data, in_job_video_data import html import json import random import re import time import requests from urllib.parse import urlparse, parse_qs from loguru import logger from common.sql_help import sqlCollect class Top: def get_text_dy_video(self,url): max_retries = 3 retry_count = 0 while retry_count < max_retries: try: if "http" not in url: video_id = url elif "&vid=" in url: parsed_url = urlparse(url) params = parse_qs(parsed_url.query) video_id = params.get('vid', [None])[0] elif "?modal_id=" in url: parsed_url = urlparse(url) params = parse_qs(parsed_url.query) video_id = params.get('modal_id', [None])[0] else: headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;' 'q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/127.0.0.0 Safari/537.36', } response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout=30) location = response.headers.get('Location', None) video_id = re.search(r'/video/(\d+)/?', location.split('?')[0] if location else url).group(1) url = "http://8.217.192.46:8889/crawler/dou_yin/detail" if not video_id or not video_id.strip(): return None, None, None payload = json.dumps({ "content_id": str(video_id) }) headers = { 'Content-Type': 'application/json' } time.sleep(random.uniform(5, 10)) response = requests.request("POST", url, headers=headers, data=payload, timeout= 60) response = response.json() code = response["code"] if code == 0: data = response["data"]["data"] channel_account_id = data["channel_account_id"] return channel_account_id if code == 22002: if '抖音内容已被删除或无法访问' in response['msg']: return "作品不存在" except Exception as e: retry_count += 1 logger.error(f"[+] 抖音{url}获取视频链接失败,失败信息{e}") time.sleep(1) return None def get_text_ks_video(self,url): try: if "http" not in url: video_id = url else: headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;' 'q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/127.0.0.0 Safari/537.36', } response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout= 30) location = response.headers.get('Location', None) video_id = re.search(r'/(f|photo|short-video|long-video)/(.*)/?', location.split('?')[0] if location else url).group(2) url = "http://8.217.192.46:8889/crawler/kuai_shou/detail" if not video_id or not video_id.strip(): return None payload = json.dumps({ "content_id": str(video_id) }) headers = { 'Content-Type': 'application/json' } time.sleep(random.uniform(5, 10)) response = requests.request("POST", url, headers=headers, data=payload, timeout= 30) response = response.json() code = response["code"] if code == 0: data = response["data"]["data"] channel_account_id = data['channel_account_id'] return channel_account_id elif code == 27006: if "作品不存在" in response['msg'] or "内容不存在" in response['msg'] or "私密作品" in response['msg']: return "作品不存在" time.sleep(3) except Exception as e: logger.error(f"[+] 快手{url}获取视频链接失败,失败信息{e}") return None def main(self,data): channel_account_id = None tag_transport_channel = None data = json.loads(data) AliyunLogger.logging(data['channel'], data, "开始获取","fail") channel_id = data['channel'] url_id, data_channel = sqlCollect.get_channle_id(data['videoid']) if not url_id: logger.info(f"[+] 任务{data},没有该视频信息") AliyunLogger.logging(data['channel'], data, "没有该视频信息","fail") return if "&vid=" in url_id or "?modal_id=" in url_id: host = urlparse(url_id).netloc else: msg = html.unescape(url_id).split('?')[0] pattern = re.search(r'https?://[^\s<>"\'\u4e00-\u9fff]+', msg) if not pattern: in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4)) return url_id = pattern.group() host = urlparse(url_id).netloc if host in ['v.douyin.com', 'www.douyin.com', 'www.iesdouyin.com'] or data_channel in "抖音": tag_transport_channel = "抖音" logger.info(f"[+] {url_id}开始获取抖音视频链接") channel_account_id = self.get_text_dy_video(url=url_id) elif host in ['v.kuaishou.com', 'www.kuaishou.com', 'v.m.chenzhongtech.com', 'creater.eozatvmq.com'] or data_channel in "快手": tag_transport_channel = "快手" logger.info(f"[+] {url_id}开始获取快手视频链接") channel_account_id= self.get_text_ks_video(url=url_id) if not channel_account_id or channel_account_id == "作品不存在": in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4)) AliyunLogger.logging(data['channel'], data, f"没有获取到视频用户ID,等待重新获取/{channel_account_id}","fail") return status = sqlCollect.select_pj_video_data(channel_account_id) if status: logger.info(f"[+] 任务{data},该用户站外ID已添加过") AliyunLogger.logging(data['channel'], data, "该用户站外ID已添加过", channel_account_id) return data["channel_account_id"] = channel_account_id if channel_id in ["抖音关键词抓取", "快手关键词抓取"]: data["tag_transport_channel"] = tag_transport_channel redis_data = f"task:top_data_{'dy' if channel_id == '抖音关键词抓取' else 'ks'}_gjc" else: data["tag_transport_channel"] = tag_transport_channel redis_data = f"task:top_data_{'ks' if tag_transport_channel == '快手' else 'dy'}_gz" AliyunLogger.logging(data['channel'], data, "获取成功等待写入改造任务", channel_account_id) in_job_video_data(redis_data, json.dumps(data, ensure_ascii=False, indent=4)) sqlCollect.insert_pj_video_data(channel_account_id, channel_id) logger.info(f"[+] 开始写入飞书表格") current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [ [ data['uid'], data['videoid'], data['return_uv'], data['type'], data['type_owner'], data['channel'], data['channel_owner'], data['title'], data['dt'], channel_account_id, tag_transport_channel, formatted_time ] ] Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values) logger.info(f"[+] 成功写入飞书表格") return if __name__ == '__main__': url,channel = sqlCollect.get_channle_id(45843781) print(url,channel) # top = Top() # top.main("task:top_all_data")