kevin.yang 6 maanden geleden
bovenliggende
commit
f234258254
2 gewijzigde bestanden met toevoegingen van 205 en 2 verwijderingen
  1. 205 0
      job.py
  2. 0 2
      requirements.txt

+ 205 - 0
job.py

@@ -0,0 +1,205 @@
+import json
+import os
+import time
+import uuid
+from typing import Literal, Optional, Tuple
+
+import cv2
+import google.generativeai as genai
+import requests
+import schedule
+from google.generativeai.types import (File, GenerateContentResponse,
+                                       HarmBlockThreshold, HarmCategory)
+from loguru import logger
+
+from common.common_log import Common
+from common.feishu_data import Material
+from common.redis import SyncRedisHelper
+
+ENV = os.getenv('ENV', 'dev')
+API_KEY = os.getenv('API_KEY')
+TASK_TYPE = os.getenv('TASK_TYPE')
+
+PROXY_ADDR = 'http://localhost:1081'
+CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/')
+SAMPLE_DATA = {
+    "一、基础信息": {
+        "视觉/音乐/文字": "",
+        "内容选题": "",
+        "视频主题": ""
+    },
+    "二、主体和场景": {
+        "视频主体": "",
+        "视频场景": []
+    },
+    "三、情感与风格": {},
+    "四、视频传播性与观众": {
+        "片尾引导": {},
+        "传播性判断": "",
+        "观众画像": {}
+    },
+    "五、音画细节": {
+        "音频细节": {},
+        "视频水印": {},
+        "视频字幕": {},
+        "视频口播": ""
+    },
+    "六、人物与场景": {
+        "知名人物": {},
+        "人物年龄段": "",
+        "场景描述": []
+    },
+    "七、时效性与分类": {
+        "时效性": {},
+        "视频一级分类": "",
+        "二级分类": ["品类- 、分数-", "品类- 、分数-", "品类- 、分数-"]
+    }
+}
+
+if ENV == 'dev':
+    os.environ['http_proxy'] = PROXY_ADDR
+    os.environ['https_proxy'] = PROXY_ADDR
+
+
+def get_redis_task(task_type: Literal['recommend', 'top']) -> Optional[bytes]:
+    redis_key = f'task:video_ai_{task_type}'
+    redis_task: bytes = SyncRedisHelper().get_client().rpop(redis_key)
+    if redis_task:
+        logger.success(f'[+] 获取到 {task_type} 类型任务: {redis_task.decode()}')
+    else:
+        logger.error(f'[+] 未获取到 {task_type} 类型任务')
+    return redis_task
+
+
+def get_video_duration(video_link: str) -> int:
+    cap = cv2.VideoCapture(video_link)
+    if cap.isOpened():
+        rate = cap.get(5)
+        frame_num = cap.get(7)
+        duration = int(frame_num / rate)
+        return duration
+    return 0
+
+
+def download_video(video_link: str) -> Optional[str]:
+    file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
+    for _ in range(3):
+        try:
+            response = requests.get(url=video_link)
+            if response.status_code == 200:
+                with open(file_path, 'wb') as f:
+                    f.write(response.content)
+                logger.info(f'[+] 视频链接: {video_link}, 存储地址: {file_path}')
+                return file_path
+        except Exception:
+            time.sleep(1)
+            continue
+    return
+
+
+def upload_video(video_path: str) -> Optional[Tuple[File, str]]:
+    try:
+        file = genai.upload_file(path=video_path)
+        while True:
+            if file.state.name == 'PROCESSING':
+                time.sleep(1)
+                file = genai.get_file(name=file.name)
+            else:
+                return file, file.state.name
+    except Exception as e:
+        logger.error(f'[+] 上传视频失败: {e}')
+        return
+
+
+def create_model_cache() -> Optional[genai.GenerativeModel]:
+    try:
+        model = genai.GenerativeModel(
+            model_name='gemini-1.5-flash',
+            generation_config={'response_mime_type': 'application/json'},
+            safety_settings={HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE},
+        )
+        logger.info('[+] 创建缓存模型成功')
+        return model
+    except Exception as e:
+        logger.error(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
+        Common.logger('ai').info(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
+        return
+
+
+def analyze_video(model: genai.GenerativeModel, google_file: File, prompt: str) -> Optional[GenerateContentResponse]:
+    try:
+        session = model.start_chat(history=[])
+        content = {
+            'parts': [
+                google_file,
+                f'{prompt}\n输出返回格式样例:\n{SAMPLE_DATA}',
+            ],
+        }
+        return session.send_message(content=content)
+    except Exception as e:
+        logger.error(f'视频处理请求失败: {e}')
+        Common.logger('ai').info(f'视频处理请求失败: {e}')
+        return
+
+
+def run():
+    if not API_KEY:
+        logger.error('[+] 请在环境变量中新增 API_KEY')
+        return
+    if not TASK_TYPE:
+        logger.error('[+] 请在环境变量中新增 TASK_TYPE, 可选值: recommend | top')
+        return
+    genai.configure(api_key=API_KEY)
+
+    redis_task = get_redis_task(task_type=TASK_TYPE)
+    if not redis_task:
+        return
+    redis_task = json.loads(redis_task)
+
+    mark, prompt = Material.feishu_list()
+
+    video_duration = get_video_duration(video_link=redis_task['video_path'])
+    if not video_duration:
+        logger.error('[+] 获取视频时长失败, 跳过任务')
+        return
+    elif video_duration >= 600:
+        logger.error('[+] 视频时长超过10分钟, 跳过任务')
+        return
+
+    video_path = download_video(video_link=redis_task['video_path'])
+    if not video_path:
+        logger.error(f'[+] 视频下载失败, 跳过任务')
+        return
+
+    google_file, google_file_state = upload_video(video_path=video_path)
+    if not google_file_state:
+        return
+    elif google_file_state != 'ACTIVE':
+        logger.error('[+] 视频上传状态不为 ACTIVE, 跳过任务')
+        return
+
+    model = create_model_cache()
+    if isinstance(model, str):
+        logger.error('[+] 创建模型失败, 跳过任务')
+        return
+
+    response = analyze_video(model=model, google_file=google_file, prompt=prompt)
+    if isinstance(response, str):
+        logger.error('[+] 获取模型响应失败, 跳过任务')
+        return
+
+    usage_info, text = str(response.usage_metadata).replace('\n', ', '), response.text.strip()
+    logger.info(f'[+] 使用情况: {usage_info}')
+    logger.info(f'[+] 模型响应结果: {text}')
+
+
+if __name__ == '__main__':
+    logger.info(f'[+] 任务已启动 -> API_KEY: {API_KEY}, TASK_TYPE: {TASK_TYPE}')
+    schedule.every(interval=1).seconds.do(run)
+    while True:
+        try:
+            schedule.run_pending()
+            time.sleep(1)
+        except KeyboardInterrupt:
+            break
+    logger.info('[+] 任务已停止')

+ 0 - 2
requirements.txt

@@ -1,5 +1,4 @@
 aliyun-log-python-sdk==0.9.12
-fastapi==0.115.3
 google-generativeai==0.8.3
 loguru==0.7.2
 odps==3.5.1
@@ -7,4 +6,3 @@ opencv-python==4.10.0.84
 redis==5.1.1
 requests==2.32.3
 schedule==1.2.2
-uvicorn==0.32.0