advertisement_job.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import json
  2. import os
  3. import time
  4. import uuid
  5. from typing import Literal, Optional, Tuple
  6. import cv2
  7. import google.generativeai as genai
  8. import requests
  9. import schedule
  10. from google.generativeai.types import (File, GenerateContentResponse,
  11. HarmBlockThreshold, HarmCategory)
  12. from loguru import logger
  13. from common.aliyun_log import AliyunLogger
  14. from common.common_log import Common
  15. from common.feishu_data import Material
  16. from common.redis import SyncRedisHelper, ad_in_video_data
  17. ENV = os.getenv('ENV', 'dev')
  18. API_KEY = os.getenv('API_KEY')
  19. TASK_TYPE = os.getenv('TASK_TYPE')
  20. PROXY_ADDR = 'http://localhost:1081'
  21. CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/')
  22. SAMPLE_DATA = {
  23. "一、基础信息": {
  24. "视频主题": "",
  25. "视频关键词": "",
  26. "产品卖点":[]
  27. },
  28. "二、主体和场景": {
  29. "视频主体": "",
  30. "人物数量": "",
  31. "人物年龄段": [],
  32. "人物性别": [],
  33. "场景描述": []
  34. },
  35. "三、情感与风格": {
  36. "情感倾向": "",
  37. "视频风格": ""
  38. },
  39. "四、视频传播性与时效": {
  40. "片尾引导": "",
  41. "传播强度": "",
  42. "时效性": "",
  43. "适用时间": ""
  44. },
  45. "五、用户画像": {
  46. "价值类型": "",
  47. "用户价值点": "",
  48. "性别": "",
  49. "年龄": "",
  50. "观众收入": ""
  51. },
  52. "六、音画信息": {
  53. "背景音类型": "",
  54. "背景音风格": "",
  55. "语音类型": "",
  56. "字幕": "",
  57. "颜色": "",
  58. "logo": ""
  59. },
  60. "七、封面信息": {
  61. "封面主体": "",
  62. "人物个数": "",
  63. "文字数量": "",
  64. "文字关键字": [],
  65. "封面主题": ""
  66. },
  67. "八、剪辑信息": {
  68. "视频开场风格": "",
  69. "展现形式": ""
  70. },
  71. "九、类目": {
  72. "视频一级分类": "",
  73. "视频二级分类": ["品类- 、分数-","品类- 、分数-","品类- 、分数-"]
  74. },
  75. "十、视频时长": {
  76. "时长": "",
  77. },
  78. "八、视频宽高": {
  79. "宽高": "",
  80. "宽高比": ""
  81. }
  82. }
  83. if ENV == 'dev':
  84. os.environ['http_proxy'] = PROXY_ADDR
  85. os.environ['https_proxy'] = PROXY_ADDR
  86. def get_redis_task(task_type: Literal['recommend', 'top']) -> Optional[bytes]:
  87. redis_key = f'task:ad_video_recommend'
  88. redis_task: bytes = SyncRedisHelper().get_client().rpop(redis_key)
  89. if redis_task:
  90. logger.success(f'[+] 获取到 {task_type} 类型任务: {redis_task.decode()}')
  91. else:
  92. logger.error(f'[+] 未获取到 {task_type} 类型任务')
  93. return redis_task
  94. def get_video_duration(video_link: str) -> int:
  95. cap = cv2.VideoCapture(video_link)
  96. if cap.isOpened():
  97. rate = cap.get(5)
  98. frame_num = cap.get(7)
  99. duration = int(frame_num / rate)
  100. return duration
  101. return 0
  102. def download_video(video_link: str) -> Optional[str]:
  103. file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
  104. for _ in range(3):
  105. try:
  106. response = requests.get(url=video_link, timeout=60)
  107. if response.status_code == 200:
  108. with open(file_path, 'wb') as f:
  109. f.write(response.content)
  110. logger.info(f'[+] 视频链接: {video_link}, 存储地址: {file_path}')
  111. return file_path
  112. except Exception:
  113. time.sleep(1)
  114. continue
  115. return
  116. def upload_video(video_path: str, redis_task) -> Optional[Tuple[File, str]]:
  117. try:
  118. file = genai.upload_file(path=video_path)
  119. while True:
  120. if file.state.name == 'PROCESSING':
  121. time.sleep(1)
  122. file = genai.get_file(name=file.name)
  123. else:
  124. return file, file.state.name
  125. except Exception as e:
  126. AliyunLogger.ad_logging(str(redis_task['ad_id']),
  127. redis_task['creative_code'],
  128. redis_task['creative_title'],
  129. redis_task['material_address'],
  130. redis_task['click_button_text'],
  131. redis_task['creative_logo_address'],
  132. redis_task['update_time'],
  133. f"[+] 上传视频失败: {e}")
  134. logger.error(f'[+] 上传视频失败: {e}')
  135. ad_in_video_data(redis_task)
  136. return
  137. def create_model_cache(redis_task) -> Optional[genai.GenerativeModel]:
  138. try:
  139. model = genai.GenerativeModel(
  140. model_name='gemini-1.5-flash',
  141. generation_config={'response_mime_type': 'application/json'},
  142. safety_settings={HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE},
  143. )
  144. logger.info('[+] 创建缓存模型成功')
  145. return model
  146. except Exception as e:
  147. AliyunLogger.ad_logging(str(redis_task['ad_id']),
  148. redis_task['creative_code'],
  149. redis_task['creative_title'],
  150. redis_task['material_address'],
  151. redis_task['click_button_text'],
  152. redis_task['creative_logo_address'],
  153. redis_task['update_time'],
  154. f"[+] 视频创建缓存内容,并返回生成模型异常信息: {e}")
  155. logger.error(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
  156. ad_in_video_data(redis_task)
  157. Common.logger('ai').info(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
  158. return
  159. def analyze_video(model: genai.GenerativeModel, google_file: File, prompt: str, redis_task) -> Optional[GenerateContentResponse]:
  160. try:
  161. session = model.start_chat(history=[])
  162. content = {
  163. 'parts': [
  164. google_file,
  165. f'{prompt}\n输出返回格式样例:\n{SAMPLE_DATA}',
  166. ],
  167. }
  168. return session.send_message(content=content)
  169. except Exception as e:
  170. AliyunLogger.ad_logging(str(redis_task['ad_id']),
  171. redis_task['creative_code'],
  172. redis_task['creative_title'],
  173. redis_task['material_address'],
  174. redis_task['click_button_text'],
  175. redis_task['creative_logo_address'],
  176. redis_task['update_time'],
  177. f"[+] 视频处理请求失败: {e}")
  178. ad_in_video_data(redis_task)
  179. logger.error(f'视频处理请求失败: {e}')
  180. Common.logger('ai').info(f'视频处理请求失败: {e}')
  181. return
  182. def run():
  183. video_path = None
  184. try:
  185. if not API_KEY:
  186. logger.error('[+] 请在环境变量中新增 API_KEY')
  187. return
  188. if not TASK_TYPE:
  189. logger.error('[+] 请在环境变量中新增 TASK_TYPE, 可选值: recommend | top')
  190. return
  191. genai.configure(api_key=API_KEY)
  192. redis_task = get_redis_task(task_type=TASK_TYPE)
  193. if not redis_task:
  194. time.sleep(10)
  195. return
  196. redis_task = json.loads(redis_task)
  197. mark, prompt = Material.ad_feishu_list()
  198. # video_duration = get_video_duration(video_link=redis_task[3])
  199. # if not video_duration:
  200. # AliyunLogger.logging( str( redis_task[0] ), redis_task[1], redis_task['video_path'], "",
  201. # redis_task['type'], redis_task['partition'], "[+] 获取视频时长失败, 跳过任务" )
  202. # logger.error('[+] 获取视频时长失败, 跳过任务')
  203. # return
  204. # elif video_duration >= 600:
  205. # AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], "",
  206. # redis_task['type'], redis_task['partition'], "[+] 视频时长超过10分钟, 跳过任务" )
  207. # logger.error('[+] 视频时长超过10分钟, 跳过任务')
  208. # return
  209. video_path = download_video(video_link=redis_task['material_address'])
  210. if not video_path:
  211. AliyunLogger.ad_logging( str(redis_task['ad_id']),
  212. redis_task['creative_code'],
  213. redis_task['creative_title'],
  214. redis_task['material_address'],
  215. redis_task['click_button_text'],
  216. redis_task['creative_logo_address'],
  217. redis_task['update_time'],
  218. "[+] 视频下载失败, 跳过任务" )
  219. logger.error(f'[+] 视频下载失败, 跳过任务')
  220. if os.path.exists(video_path):
  221. os.remove(video_path)
  222. logger.info(f"文件已删除: {video_path}")
  223. return
  224. google_file, google_file_state = upload_video(video_path=video_path, redis_task=redis_task)
  225. if not google_file_state:
  226. return
  227. elif google_file_state != 'ACTIVE':
  228. logger.error('[+] 视频上传状态不为 ACTIVE, 跳过任务')
  229. genai.delete_file(google_file)
  230. if os.path.exists(video_path):
  231. os.remove(video_path)
  232. logger.info(f"文件已删除: {video_path}")
  233. return
  234. model = create_model_cache(redis_task=redis_task)
  235. if isinstance(model, str):
  236. logger.error('[+] 创建模型失败, 跳过任务')
  237. genai.delete_file(google_file)
  238. if os.path.exists(video_path):
  239. os.remove(video_path)
  240. logger.info(f"文件已删除: {video_path}")
  241. return
  242. response = analyze_video(model=model, google_file=google_file, prompt=prompt, redis_task=redis_task)
  243. if isinstance(response, str):
  244. logger.error('[+] 获取模型响应失败, 跳过任务')
  245. genai.delete_file(google_file)
  246. if os.path.exists(video_path):
  247. os.remove(video_path)
  248. logger.info(f"文件已删除: {video_path}")
  249. return
  250. text = response.text.strip()
  251. cleaned_text = text.replace("```json", '').replace("```", '').strip()
  252. AliyunLogger.ad_logging(str(redis_task['ad_id']),
  253. redis_task['creative_code'],
  254. redis_task['creative_title'],
  255. redis_task['material_address'],
  256. redis_task['click_button_text'],
  257. redis_task['creative_logo_address'],
  258. redis_task['update_time'],
  259. str(cleaned_text))
  260. logger.info(f'[+] 模型响应结果: {text}')
  261. if os.path.exists(video_path):
  262. os.remove(video_path)
  263. logger.info(f"文件已删除: {video_path}")
  264. genai.delete_file(google_file)
  265. except KeyboardInterrupt:
  266. if os.path.exists(video_path):
  267. os.remove(video_path)
  268. if __name__ == '__main__':
  269. logger.info(f'[+] 任务已启动 -> API_KEY: {API_KEY}, TASK_TYPE: {TASK_TYPE}')
  270. schedule.every(interval=1).seconds.do(run)
  271. while True:
  272. try:
  273. schedule.run_pending()
  274. time.sleep(1)
  275. except KeyboardInterrupt:
  276. break
  277. logger.info('[+] 任务已停止')