123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- import json
- import os
- import time
- import uuid
- from typing import Literal, Optional, Tuple
- import cv2
- import google.generativeai as genai
- import requests
- import schedule
- from google.generativeai.types import (File, GenerateContentResponse,
- HarmBlockThreshold, HarmCategory)
- from loguru import logger
- from common.aliyun_log import AliyunLogger
- from common.common_log import Common
- from common.feishu_data import Material
- from common.redis import SyncRedisHelper, ad_in_video_data
- ENV = os.getenv('ENV', 'dev')
- API_KEY = os.getenv('API_KEY')
- TASK_TYPE = os.getenv('TASK_TYPE')
- PROXY_ADDR = 'http://localhost:1081'
- CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/')
- SAMPLE_DATA = {
- "一、基础信息": {
- "视频主题": "",
- "视频关键词": "",
- "产品卖点":[]
- },
- "二、主体和场景": {
- "视频主体": "",
- "人物数量": "",
- "人物年龄段": [],
- "人物性别": [],
- "场景描述": []
- },
- "三、情感与风格": {
- "情感倾向": "",
- "视频风格": ""
- },
- "四、视频传播性与时效": {
- "片尾引导": "",
- "传播强度": "",
- "时效性": "",
- "适用时间": ""
- },
- "五、用户画像": {
- "价值类型": "",
- "用户价值点": "",
- "性别": "",
- "年龄": "",
- "观众收入": ""
- },
- "六、音画信息": {
- "背景音类型": "",
- "背景音风格": "",
- "语音类型": "",
- "字幕": "",
- "颜色": "",
- "logo": ""
- },
- "七、封面信息": {
- "封面主体": "",
- "人物个数": "",
- "文字数量": "",
- "文字关键字": [],
- "封面主题": ""
- },
- "八、剪辑信息": {
- "视频开场风格": "",
- "展现形式": ""
- },
- "九、类目": {
- "视频一级分类": "",
- "视频二级分类": ["品类- 、分数-","品类- 、分数-","品类- 、分数-"]
- },
- "十、视频时长": {
- "时长": "",
- },
- "八、视频宽高": {
- "宽高": "",
- "宽高比": ""
- }
- }
- if ENV == 'dev':
- os.environ['http_proxy'] = PROXY_ADDR
- os.environ['https_proxy'] = PROXY_ADDR
- def get_redis_task(task_type: Literal['recommend', 'top']) -> Optional[bytes]:
- redis_key = f'task:ad_video_recommend'
- redis_task: bytes = SyncRedisHelper().get_client().rpop(redis_key)
- if redis_task:
- logger.success(f'[+] 获取到 {task_type} 类型任务: {redis_task.decode()}')
- else:
- logger.error(f'[+] 未获取到 {task_type} 类型任务')
- return redis_task
- def get_video_duration(video_link: str) -> int:
- cap = cv2.VideoCapture(video_link)
- if cap.isOpened():
- rate = cap.get(5)
- frame_num = cap.get(7)
- duration = int(frame_num / rate)
- return duration
- return 0
- def download_video(video_link: str) -> Optional[str]:
- file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
- for _ in range(3):
- try:
- response = requests.get(url=video_link, timeout=60)
- if response.status_code == 200:
- with open(file_path, 'wb') as f:
- f.write(response.content)
- logger.info(f'[+] 视频链接: {video_link}, 存储地址: {file_path}')
- return file_path
- except Exception:
- time.sleep(1)
- continue
- return
- def upload_video(video_path: str, redis_task) -> Optional[Tuple[File, str]]:
- try:
- file = genai.upload_file(path=video_path)
- while True:
- if file.state.name == 'PROCESSING':
- time.sleep(1)
- file = genai.get_file(name=file.name)
- else:
- return file, file.state.name
- except Exception as e:
- AliyunLogger.ad_logging(str(redis_task['ad_id']),
- redis_task['creative_code'],
- redis_task['creative_title'],
- redis_task['material_address'],
- redis_task['click_button_text'],
- redis_task['creative_logo_address'],
- redis_task['update_time'],
- f"[+] 上传视频失败: {e}")
- logger.error(f'[+] 上传视频失败: {e}')
- ad_in_video_data(redis_task)
- return
- def create_model_cache(redis_task) -> Optional[genai.GenerativeModel]:
- try:
- model = genai.GenerativeModel(
- model_name='gemini-1.5-flash',
- generation_config={'response_mime_type': 'application/json'},
- safety_settings={HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE},
- )
- logger.info('[+] 创建缓存模型成功')
- return model
- except Exception as e:
- AliyunLogger.ad_logging(str(redis_task['ad_id']),
- redis_task['creative_code'],
- redis_task['creative_title'],
- redis_task['material_address'],
- redis_task['click_button_text'],
- redis_task['creative_logo_address'],
- redis_task['update_time'],
- f"[+] 视频创建缓存内容,并返回生成模型异常信息: {e}")
- logger.error(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
- ad_in_video_data(redis_task)
- Common.logger('ai').info(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
- return
- def analyze_video(model: genai.GenerativeModel, google_file: File, prompt: str, redis_task) -> Optional[GenerateContentResponse]:
- try:
- session = model.start_chat(history=[])
- content = {
- 'parts': [
- google_file,
- f'{prompt}\n输出返回格式样例:\n{SAMPLE_DATA}',
- ],
- }
- return session.send_message(content=content)
- except Exception as e:
- AliyunLogger.ad_logging(str(redis_task['ad_id']),
- redis_task['creative_code'],
- redis_task['creative_title'],
- redis_task['material_address'],
- redis_task['click_button_text'],
- redis_task['creative_logo_address'],
- redis_task['update_time'],
- f"[+] 视频处理请求失败: {e}")
- ad_in_video_data(redis_task)
- logger.error(f'视频处理请求失败: {e}')
- Common.logger('ai').info(f'视频处理请求失败: {e}')
- return
- def run():
- video_path = None
- try:
- if not API_KEY:
- logger.error('[+] 请在环境变量中新增 API_KEY')
- return
- if not TASK_TYPE:
- logger.error('[+] 请在环境变量中新增 TASK_TYPE, 可选值: recommend | top')
- return
- genai.configure(api_key=API_KEY)
- redis_task = get_redis_task(task_type=TASK_TYPE)
- if not redis_task:
- time.sleep(10)
- return
- redis_task = json.loads(redis_task)
- mark, prompt = Material.ad_feishu_list()
- # video_duration = get_video_duration(video_link=redis_task[3])
- # if not video_duration:
- # AliyunLogger.logging( str( redis_task[0] ), redis_task[1], redis_task['video_path'], "",
- # redis_task['type'], redis_task['partition'], "[+] 获取视频时长失败, 跳过任务" )
- # logger.error('[+] 获取视频时长失败, 跳过任务')
- # return
- # elif video_duration >= 600:
- # AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], "",
- # redis_task['type'], redis_task['partition'], "[+] 视频时长超过10分钟, 跳过任务" )
- # logger.error('[+] 视频时长超过10分钟, 跳过任务')
- # return
- video_path = download_video(video_link=redis_task['material_address'])
- if not video_path:
- AliyunLogger.ad_logging( str(redis_task['ad_id']),
- redis_task['creative_code'],
- redis_task['creative_title'],
- redis_task['material_address'],
- redis_task['click_button_text'],
- redis_task['creative_logo_address'],
- redis_task['update_time'],
- "[+] 视频下载失败, 跳过任务" )
- logger.error(f'[+] 视频下载失败, 跳过任务')
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- google_file, google_file_state = upload_video(video_path=video_path, redis_task=redis_task)
- if not google_file_state:
- return
- elif google_file_state != 'ACTIVE':
- logger.error('[+] 视频上传状态不为 ACTIVE, 跳过任务')
- genai.delete_file(google_file)
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- model = create_model_cache(redis_task=redis_task)
- if isinstance(model, str):
- logger.error('[+] 创建模型失败, 跳过任务')
- genai.delete_file(google_file)
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- response = analyze_video(model=model, google_file=google_file, prompt=prompt, redis_task=redis_task)
- if isinstance(response, str):
- logger.error('[+] 获取模型响应失败, 跳过任务')
- genai.delete_file(google_file)
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- return
- text = response.text.strip()
- cleaned_text = text.replace("```json", '').replace("```", '').strip()
- AliyunLogger.ad_logging(str(redis_task['ad_id']),
- redis_task['creative_code'],
- redis_task['creative_title'],
- redis_task['material_address'],
- redis_task['click_button_text'],
- redis_task['creative_logo_address'],
- redis_task['update_time'],
- str(cleaned_text))
- logger.info(f'[+] 模型响应结果: {text}')
- if os.path.exists(video_path):
- os.remove(video_path)
- logger.info(f"文件已删除: {video_path}")
- genai.delete_file(google_file)
- except KeyboardInterrupt:
- if os.path.exists(video_path):
- os.remove(video_path)
- if __name__ == '__main__':
- logger.info(f'[+] 任务已启动 -> API_KEY: {API_KEY}, TASK_TYPE: {TASK_TYPE}')
- schedule.every(interval=1).seconds.do(run)
- while True:
- try:
- schedule.run_pending()
- time.sleep(1)
- except KeyboardInterrupt:
- break
- logger.info('[+] 任务已停止')
|