import html import json import os import random import re import time import uuid import requests from datetime import datetime from urllib.parse import urlparse, parse_qs from loguru import logger from common import Oss, Feishu, AliyunLogger, Material from common.download_video import DownLoad from common.ffmpeg import FFmpeg from common.google_ai_studio import GoogleAI from common.gpt4o_mini_help import GPT4oMini from common.redis import in_carry_video_data from common.sql_help import sqlCollect from common.tag_video import Tag from common.tts_help import TTS from data_channel.piaoquan import PQ class NrfxCarryViode: def get_text_dy_video(self,url): max_retries = 3 retry_count = 0 while retry_count < max_retries: try: if "&vid=" in url: parsed_url = urlparse(url) params = parse_qs(parsed_url.query) video_id = params.get('vid', [None])[0] elif "?modal_id=" in url: parsed_url = urlparse(url) params = parse_qs(parsed_url.query) video_id = params.get('modal_id', [None])[0] else: headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;' 'q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/127.0.0.0 Safari/537.36', } response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout=30) location = response.headers.get('Location', None) video_id = re.search(r'/video/(\d+)/?', location.split('?')[0] if location else url).group(1) url = "http://8.217.192.46:8889/crawler/dou_yin/detail" if not video_id or not video_id.strip(): return None, None, None payload = json.dumps({ "content_id": str(video_id) }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload, timeout= 60) response = response.json() code = response["code"] if code == 0: data = response["data"]["data"] video_url = data["video_url_list"][0]["video_url"] original_title = data["title"] return video_url, original_title, video_id if code == 22002: if '抖音内容已被删除或无法访问' in response['msg']: return "作品不存在", None, None except Exception as e: retry_count += 1 logger.error(f"[+] 抖音{url}获取视频链接失败,失败信息{e}") time.sleep(1) return None, None, None def get_text_ks_video(self,url): try: headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;' 'q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/127.0.0.0 Safari/537.36', } response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout= 30) location = response.headers.get('Location', None) video_id = re.search(r'/(f|photo|short-video|long-video)/(.*)/?', location.split('?')[0] if location else url).group(2) url = "http://8.217.192.46:8889/crawler/kuai_shou/detail" if not video_id or not video_id.strip(): return None, None, None payload = json.dumps({ "content_id": str(video_id) }) headers = { 'Content-Type': 'application/json' } time.sleep(random.uniform(10, 30)) response = requests.request("POST", url, headers=headers, data=payload, timeout= 30) response = response.json() code = response["code"] if code == 0: data = response["data"]["data"] content_type = data['content_type'] if content_type == 'note': return "note","note" video_url = data["video_url_list"][0]["video_url"] original_title = data["title"] return video_url, original_title, video_id elif code == 27006: if "作品不存在" in response['msg'] or "不存在" in response['msg'] or "私密作品" in response['msg']: return "作品不存在", None, None time.sleep(3) except Exception as e: logger.error(f"[+] 快手{url}获取视频链接失败,失败信息{e}") return None, None,None def insert_pq(self, data, oss_object_key, title, cover): logger.info(f"[+] 开始写入票圈") code = PQ.insert_piaoquantv(oss_object_key, title, '50322062') if not code: logger.error(f"[+] 写入票圈后台失败") text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 写入票圈后台失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] 写入票圈成功,返回视频id{code}") tag_status = Tag.video_tag(code, "lev-供给,rol-机器,#str-搬运改造内容理解引导语实验_60") Tag.video_tag(data["videoid"], "lev-供给,rol-机器,#str-搬运改造内容理解引导语base_61") if tag_status == 0: logger.info(f"[+] 写入标签成功,后台视频ID为{code}") try: current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") sqlCollect.insert_machine_making_data(data["channel"], data["name"], data["name"], data["videoid"], data["videoid"], "50322062", title, code, formatted_time, data["title_category"], oss_object_key) pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接 values = [ [ data["videoid"], code, data["channel"], data["dt"], formatted_time, pq_url ] ] Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "A2:Z2", values) logger.info(f"[+] 写入飞书成功") return except Exception as e: logger.error(f"[+] 写入飞书失败{e}") return def main(self, data, file_path, GEMINI_API_KEY): REDIS_NAME = 'task:carry_redis_by_nrfx' video_id = data["videoid"] AliyunLogger.logging(data["type"],"内容分析", data["channel"], video_id, "扫描到一条视频", "2001", str(data)) AliyunLogger.logging(data["type"],"内容分析", data["channel"], video_id, "符合规则等待改造", "2004", str(data)) logger.info(f"[+] 获取{video_id}的视频链接") video_path, cover_path, old_title = PQ.get_pq_oss_path(video_id) if not video_path: AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "没有获取到视频链接", "3001", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 没有获取到视频链接\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return video_url = f"http://rescdn.yishihui.com/{video_path}" video_path = DownLoad.download_video(video_url, file_path, '', video_id) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4)) logger.error(f"[+] {video_url}下载失败") AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "视频下载失败等待重新处理", "3002", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频下载失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] {video_url}视频下载成功") logger.info(f"[+] {video_url}开始处理标题") logger.info(f"[+] 视频更改分辨率处理成功") logger.info(f"[+] 内容分析-开始获取视频口播内容") video_text = GoogleAI.run(GEMINI_API_KEY, video_path) if not video_text: AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,获取口播文案失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 获取口播文案失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] 内容分析-开始获取AI片尾") pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text) pw_url = TTS.get_pw_zm(pw_srt_text, 'zhifeng_emo') if not pw_url: logger.error(f"[+] 内容分析-片尾获取失败") data["transform_rule"] = "仅改造" AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,片尾获取失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾获取失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] 内容分析-片尾获取成功") pw_srt = TTS.getSrt(pw_url) if not pw_srt: AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,片尾音频获取失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾音频获取失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return pw_mp3_path = TTS.download_mp3(pw_url, file_path) if not pw_mp3_path: AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,片尾音频下载失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾音频下载失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] 内容分析-片尾音频下载成功") logger.info(f"[+] 内容分析-片尾获取最后一帧成功") jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频 if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0: logger.error(f"[+] 内容分析-片尾拼接失败") AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"], "内容分析,片尾拼接失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾拼接失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] 内容分析-合并开始拼接") video_path = FFmpeg.h_b_video(video_path, pw_path, file_path) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4)) logger.error(f"[+] 内容分析-添加片尾失败") text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 添加片尾失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] 内容分析-开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS status = oss_object_key.get("status") if status != 200: logger.error(f"[+] 内容分析-发送oss失败") AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,发送oss失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 发送oss失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[+] 内容分析-发送oss成功") oss_object_key = oss_object_key.get("oss_object_key") self.insert_pq(data, oss_object_key, old_title, cover_path) return