consumption_work.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from loguru import logger
  9. sys.path.append('/app')
  10. from utils.aliyun_log import AliyunLogger
  11. from utils.google_ai_studio import GoogleAI
  12. from utils.piaoquan import PQ
  13. from utils.redis import RedisHelper, content_video_data
  14. class ConsumptionRecommend(object):
  15. @classmethod
  16. async def run(cls):
  17. logger.info(f"[处理] 开始获取redis数据")
  18. task = RedisHelper().get_client().rpop(name = 'task:video_insight')
  19. if not task:
  20. logger.info('[处理] 无待执行的扫描任务')
  21. return
  22. task = orjson.loads(task)
  23. logger.info(f"[处理] 获取redis数据{task}")
  24. video_id = task['video_id']
  25. logger.info(f"[处理] 开始获取原视频OSS地址")
  26. video_path = PQ.get_pq_oss(video_id)
  27. if not video_path:
  28. return
  29. logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
  30. video_url = f"http://rescdn.yishihui.com/{video_path}"
  31. logger.info(f"[处理] 开始分析视频")
  32. api_key = os.getenv("GEMINI_API_KEY")
  33. # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
  34. logger.info(f"[处理] 使用的API_KEY:{api_key}")
  35. text = GoogleAI.run(api_key, video_url)
  36. if "[异常]" in text:
  37. content_video_data(json.dumps(task))
  38. AliyunLogger.logging(str(video_id), orjson.dumps(text).decode())
  39. logger.info(f"[处理] 写入日志成功")
  40. async def run():
  41. scheduler = AsyncIOScheduler()
  42. try:
  43. logger.info(f"[处理] 开始启动")
  44. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2)) # 每2分钟启动一次
  45. scheduler.start()
  46. await asyncio.Event().wait()
  47. except KeyboardInterrupt:
  48. pass
  49. except Exception as e:
  50. logger.error(f"[处理] 启动异常,异常信息:{e}")
  51. pass
  52. finally:
  53. scheduler.shutdown()
  54. if __name__ == '__main__':
  55. # asyncio.run(ConsumptionRecommend.run())
  56. loop = asyncio.get_event_loop()
  57. loop.run_until_complete(run())