job.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. import json
  2. import os
  3. import time
  4. import uuid
  5. from typing import Literal, Optional, Tuple
  6. import cv2
  7. import google.generativeai as genai
  8. import requests
  9. import schedule
  10. from google.generativeai.types import (File, GenerateContentResponse,
  11. HarmBlockThreshold, HarmCategory)
  12. from loguru import logger
  13. from common.common_log import Common
  14. from common.feishu_data import Material
  15. from common.redis import SyncRedisHelper
  16. ENV = os.getenv('ENV', 'dev')
  17. API_KEY = os.getenv('API_KEY')
  18. TASK_TYPE = os.getenv('TASK_TYPE')
  19. PROXY_ADDR = 'http://localhost:1081'
  20. CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/')
  21. SAMPLE_DATA = {
  22. "一、基础信息": {
  23. "视觉/音乐/文字": "",
  24. "内容选题": "",
  25. "视频主题": ""
  26. },
  27. "二、主体和场景": {
  28. "视频主体": "",
  29. "视频场景": []
  30. },
  31. "三、情感与风格": {},
  32. "四、视频传播性与观众": {
  33. "片尾引导": {},
  34. "传播性判断": "",
  35. "观众画像": {}
  36. },
  37. "五、音画细节": {
  38. "音频细节": {},
  39. "视频水印": {},
  40. "视频字幕": {},
  41. "视频口播": ""
  42. },
  43. "六、人物与场景": {
  44. "知名人物": {},
  45. "人物年龄段": "",
  46. "场景描述": []
  47. },
  48. "七、时效性与分类": {
  49. "时效性": {},
  50. "视频一级分类": "",
  51. "二级分类": ["品类- 、分数-", "品类- 、分数-", "品类- 、分数-"]
  52. }
  53. }
  54. if ENV == 'dev':
  55. os.environ['http_proxy'] = PROXY_ADDR
  56. os.environ['https_proxy'] = PROXY_ADDR
  57. def get_redis_task(task_type: Literal['recommend', 'top']) -> Optional[bytes]:
  58. redis_key = f'task:video_ai_{task_type}'
  59. redis_task: bytes = SyncRedisHelper().get_client().rpop(redis_key)
  60. if redis_task:
  61. logger.success(f'[+] 获取到 {task_type} 类型任务: {redis_task.decode()}')
  62. else:
  63. logger.error(f'[+] 未获取到 {task_type} 类型任务')
  64. return redis_task
  65. def get_video_duration(video_link: str) -> int:
  66. cap = cv2.VideoCapture(video_link)
  67. if cap.isOpened():
  68. rate = cap.get(5)
  69. frame_num = cap.get(7)
  70. duration = int(frame_num / rate)
  71. return duration
  72. return 0
  73. def download_video(video_link: str) -> Optional[str]:
  74. file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
  75. for _ in range(3):
  76. try:
  77. response = requests.get(url=video_link)
  78. if response.status_code == 200:
  79. with open(file_path, 'wb') as f:
  80. f.write(response.content)
  81. logger.info(f'[+] 视频链接: {video_link}, 存储地址: {file_path}')
  82. return file_path
  83. except Exception:
  84. time.sleep(1)
  85. continue
  86. return
  87. def upload_video(video_path: str) -> Optional[Tuple[File, str]]:
  88. try:
  89. file = genai.upload_file(path=video_path)
  90. while True:
  91. if file.state.name == 'PROCESSING':
  92. time.sleep(1)
  93. file = genai.get_file(name=file.name)
  94. else:
  95. return file, file.state.name
  96. except Exception as e:
  97. logger.error(f'[+] 上传视频失败: {e}')
  98. return
  99. def create_model_cache() -> Optional[genai.GenerativeModel]:
  100. try:
  101. model = genai.GenerativeModel(
  102. model_name='gemini-1.5-flash',
  103. generation_config={'response_mime_type': 'application/json'},
  104. safety_settings={HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE},
  105. )
  106. logger.info('[+] 创建缓存模型成功')
  107. return model
  108. except Exception as e:
  109. logger.error(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
  110. Common.logger('ai').info(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
  111. return
  112. def analyze_video(model: genai.GenerativeModel, google_file: File, prompt: str) -> Optional[GenerateContentResponse]:
  113. try:
  114. session = model.start_chat(history=[])
  115. content = {
  116. 'parts': [
  117. google_file,
  118. f'{prompt}\n输出返回格式样例:\n{SAMPLE_DATA}',
  119. ],
  120. }
  121. return session.send_message(content=content)
  122. except Exception as e:
  123. logger.error(f'视频处理请求失败: {e}')
  124. Common.logger('ai').info(f'视频处理请求失败: {e}')
  125. return
  126. def run():
  127. if not API_KEY:
  128. logger.error('[+] 请在环境变量中新增 API_KEY')
  129. return
  130. if not TASK_TYPE:
  131. logger.error('[+] 请在环境变量中新增 TASK_TYPE, 可选值: recommend | top')
  132. return
  133. genai.configure(api_key=API_KEY)
  134. redis_task = get_redis_task(task_type=TASK_TYPE)
  135. if not redis_task:
  136. return
  137. redis_task = json.loads(redis_task)
  138. mark, prompt = Material.feishu_list()
  139. video_duration = get_video_duration(video_link=redis_task['video_path'])
  140. if not video_duration:
  141. logger.error('[+] 获取视频时长失败, 跳过任务')
  142. return
  143. elif video_duration >= 600:
  144. logger.error('[+] 视频时长超过10分钟, 跳过任务')
  145. return
  146. video_path = download_video(video_link=redis_task['video_path'])
  147. if not video_path:
  148. logger.error(f'[+] 视频下载失败, 跳过任务')
  149. return
  150. google_file, google_file_state = upload_video(video_path=video_path)
  151. if not google_file_state:
  152. return
  153. elif google_file_state != 'ACTIVE':
  154. logger.error('[+] 视频上传状态不为 ACTIVE, 跳过任务')
  155. return
  156. model = create_model_cache()
  157. if isinstance(model, str):
  158. logger.error('[+] 创建模型失败, 跳过任务')
  159. return
  160. response = analyze_video(model=model, google_file=google_file, prompt=prompt)
  161. if isinstance(response, str):
  162. logger.error('[+] 获取模型响应失败, 跳过任务')
  163. return
  164. usage_info, text = str(response.usage_metadata).replace('\n', ', '), response.text.strip()
  165. logger.info(f'[+] 使用情况: {usage_info}')
  166. logger.info(f'[+] 模型响应结果: {text}')
  167. if __name__ == '__main__':
  168. logger.info(f'[+] 任务已启动 -> API_KEY: {API_KEY}, TASK_TYPE: {TASK_TYPE}')
  169. schedule.every(interval=1).seconds.do(run)
  170. while True:
  171. try:
  172. schedule.run_pending()
  173. time.sleep(1)
  174. except KeyboardInterrupt:
  175. break
  176. logger.info('[+] 任务已停止')