123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import json
- import os
- import time
- import uuid
- from typing import Literal, Optional, Tuple
- import cv2
- import google.generativeai as genai
- import requests
- import schedule
- from google.generativeai.types import (File, GenerateContentResponse,
- HarmBlockThreshold, HarmCategory)
- from loguru import logger
- from common.aliyun_log import AliyunLogger
- from common.common_log import Common
- from common.feishu_data import Material
- from common.redis import SyncRedisHelper
- ENV = os.getenv('ENV', 'dev')
- API_KEY = os.getenv('API_KEY')
- TASK_TYPE = os.getenv('TASK_TYPE')
- PROXY_ADDR = 'http://localhost:1081'
- CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/')
- SAMPLE_DATA = {
- "一、基础信息": {
- "关键维度": "",
- "内容选题": "",
- "视频主题": "",
- "视频关键词":""
- },
- "二、主体和场景": {
- "视频主体": "",
- "视频场景": []
- },
- "三、情感与风格": {
- "情感倾向":"",
- "视频风格":""
- },
- "四、视频传播性与画像": {
- "片尾引导": {},
- "传播性判断": "",
- "视频用户画像": {}
- },
- "五、音画细节": {
- "音频细节": {},
- "视频水印": {},
- "视频字幕": {},
- "视频口播": ""
- },
- "六、封面信息": {
- "封面主体": "",
- "人物个数": "",
- "文字数量": "",
- "文字关键字":[],
- "封面主题":""
- },
- "七、人物与场景": {
- "知名人物":"",
- "人物年龄段":"",
- "场景描述": ""
- },
- "八、时效性与分类": {
- "时效性":"",
- "视频一级分类": "",
- "二级分类": ["品类- 、分数-", "品类- 、分数-", "品类- 、分数-"]
- },
- "九、标题理解": {
- }
- }
- if ENV == 'dev':
- os.environ['http_proxy'] = PROXY_ADDR
- os.environ['https_proxy'] = PROXY_ADDR
- def get_redis_task(task_type: Literal['recommend', 'top']) -> Optional[bytes]:
- redis_key = f'task:video_ai_{task_type}'
- redis_task: bytes = SyncRedisHelper().get_client().rpop(redis_key)
- if redis_task:
- logger.success(f'[+] 获取到 {task_type} 类型任务: {redis_task.decode()}')
- else:
- logger.error(f'[+] 未获取到 {task_type} 类型任务')
- return redis_task
- def get_video_duration(video_link: str) -> int:
- cap = cv2.VideoCapture(video_link)
- if cap.isOpened():
- rate = cap.get(5)
- frame_num = cap.get(7)
- duration = int(frame_num / rate)
- return duration
- return 0
- def download_video(video_link: str) -> Optional[str]:
- file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
- for _ in range(3):
- try:
- response = requests.get(url=video_link, timeout=360)
- if response.status_code == 200:
- with open(file_path, 'wb') as f:
- f.write(response.content)
- logger.info(f'[+] 视频链接: {video_link}, 存储地址: {file_path}')
- return file_path
- except Exception:
- time.sleep(1)
- continue
- return
- def upload_video(video_path: str, redis_task) -> Optional[Tuple[File, str]]:
- try:
- file = genai.upload_file(path=video_path)
- while True:
- if file.state.name == 'PROCESSING':
- time.sleep(1)
- file = genai.get_file(name=file.name)
- else:
- return file, file.state.name
- except Exception as e:
- AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], '',
- redis_task['type'], redis_task['partition'], f"[+] 上传视频失败: {e}" )
- logger.error(f'[+] 上传视频失败: {e}')
- return
- def create_model_cache(redis_task) -> Optional[genai.GenerativeModel]:
- try:
- model = genai.GenerativeModel(
- model_name='gemini-1.5-flash',
- generation_config={'response_mime_type': 'application/json'},
- safety_settings={HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE},
- )
- logger.info('[+] 创建缓存模型成功')
- return model
- except Exception as e:
- AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], '',
- redis_task['type'], redis_task['partition'], f"[+] 视频创建缓存内容,并返回生成模型异常信息: {e}" )
- logger.error(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
- Common.logger('ai').info(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
- return
- def analyze_video(model: genai.GenerativeModel, google_file: File, prompt: str, redis_task, title) -> Optional[GenerateContentResponse]:
- try:
- session = model.start_chat(history=[])
- if title:
- content = {
- 'parts': [
- google_file,
- f'{prompt}\n输出返回格式样例:\n{SAMPLE_DATA}',
- ],
- }
- return session.send_message(content=content)
- else:
- content = {
- 'parts': [
- google_file,
- f'{prompt}\n九、标题理解:请对"{title}"标题进行分析概括3~5个词[]":\n输出返回格式样例:\n{SAMPLE_DATA}',
- ],
- }
- return session.send_message(content=content)
- except Exception as e:
- AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], '',
- redis_task['type'], redis_task['partition'], f"[+] 视频处理请求失败: {e}" )
- logger.error(f'视频处理请求失败: {e}')
- Common.logger('ai').info(f'视频处理请求失败: {e}')
- return
- def run():
- if not API_KEY:
- logger.error('[+] 请在环境变量中新增 API_KEY')
- return
- if not TASK_TYPE:
- logger.error('[+] 请在环境变量中新增 TASK_TYPE, 可选值: recommend | top')
- return
- genai.configure(api_key=API_KEY)
- redis_task = get_redis_task(task_type=TASK_TYPE)
- if not redis_task:
- time.sleep(10)
- return
- redis_task = json.loads(redis_task)
- mark, prompt = Material.feishu_list()
- video_duration = get_video_duration(video_link=redis_task['video_path'])
- if not video_duration:
- AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], "",
- redis_task['type'], redis_task['partition'], "[+] 获取视频时长失败, 跳过任务" )
- logger.error('[+] 获取视频时长失败, 跳过任务')
- return
- elif video_duration >= 600:
- AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], "",
- redis_task['type'], redis_task['partition'], "[+] 视频时长超过10分钟, 跳过任务" )
- logger.error('[+] 视频时长超过10分钟, 跳过任务')
- return
- video_path = download_video(video_link=redis_task['video_path'])
- if not video_path:
- AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], "",
- redis_task['type'], redis_task['partition'], "[+] 视频下载失败, 跳过任务" )
- logger.error(f'[+] 视频下载失败, 跳过任务')
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- google_file, google_file_state = upload_video(video_path=video_path, redis_task=redis_task)
- if not google_file_state:
- return
- elif google_file_state != 'ACTIVE':
- logger.error('[+] 视频上传状态不为 ACTIVE, 跳过任务')
- genai.delete_file(google_file)
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- model = create_model_cache(redis_task=redis_task)
- if isinstance(model, str):
- logger.error('[+] 创建模型失败, 跳过任务')
- genai.delete_file(google_file)
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- response = analyze_video(model=model, google_file=google_file, prompt=prompt, redis_task=redis_task, title=redis_task['title'])
- if isinstance(response, str):
- logger.error('[+] 获取模型响应失败, 跳过任务')
- genai.delete_file(google_file)
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- text = response.text.strip()
- cleaned_text = text.replace("```json", '').replace("```", '').strip()
- AliyunLogger.logging( str(redis_task['video_id']), redis_task['title'], redis_task['video_path'], str(mark), redis_task['type'], redis_task['partition'], str(cleaned_text))
- logger.info(f'[+] 模型响应结果: {text}')
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- genai.delete_file(google_file)
- if __name__ == '__main__':
- logger.info(f'[+] 任务已启动 -> API_KEY: {API_KEY}, TASK_TYPE: {TASK_TYPE}')
- schedule.every(interval=1).seconds.do(run)
- while True:
- try:
- schedule.run_pending()
- time.sleep(1)
- except KeyboardInterrupt:
- break
- logger.info('[+] 任务已停止')
|