consumption_work.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import asyncio
  2. import os
  3. import sys
  4. import time
  5. from datetime import datetime
  6. import orjson
  7. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  8. from apscheduler.triggers.cron import CronTrigger
  9. from loguru import logger
  10. sys.path.append('/app')
  11. from utils.feishu_utils import Feishu
  12. from utils.google_ai_studio import GoogleAI
  13. from utils.gpt4o_mimi import GPT4oMini
  14. from utils.piaoquan import PQ
  15. from utils.redis import RedisHelper
  16. class ConsumptionRecommend(object):
  17. @classmethod
  18. async def run(cls):
  19. logger.info(f"[处理] 开始获取redis数据")
  20. task = await RedisHelper().get_client().rpop(name = 'gong_ji_heng_ceng:scan_tasks')
  21. if not task:
  22. logger.info('[处理] 无待执行的扫描任务')
  23. return
  24. task = orjson.loads(task)
  25. logger.info(f"[处理] 获取redis数据{task}")
  26. video_id = task['video_id']
  27. logger.info(f"[处理] 开始获取原视频OSS地址")
  28. video_path, cover_path = PQ.get_pq_oss(video_id)
  29. if not video_path:
  30. return
  31. logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path},封面链接:{cover_path}")
  32. video_url = f"http://rescdn.yishihui.com/{video_path}"
  33. logger.info(f"[处理] 开始提取口播文案")
  34. api_key = os.getenv("GEMINI_API_KEY")
  35. logger.info(f"[处理] 使用的API_KEY:{api_key}")
  36. text = GoogleAI.run(api_key, video_url)
  37. if not text:
  38. logger.error(f"[处理] 提取口播文案失败")
  39. return
  40. if "视频下载失败" == text:
  41. logger.error(f"[处理] 视频下载失败")
  42. return
  43. logger.info(f"[处理] 提取口播文案完成")
  44. logger.info(f"[处理] 口播文案通过gpt开始生成标题")
  45. new_title = GPT4oMini.get_ai_mini_title(text)
  46. if not new_title:
  47. logger.error(f"[处理] 口播文案通过gpt无法生成标题")
  48. return
  49. logger.info(f"[处理] 口播文案通过gpt生成标题完成,{new_title}")
  50. n_id = "78354423"
  51. logger.info(f"[处理] 开始写入票圈后台")
  52. code_vid = PQ.install_tj_pq(video_id, video_path, new_title, n_id, cover_path)
  53. if not code_vid:
  54. logger.error(f"[处理] 写入票圈后台失败")
  55. return
  56. logger.info(f"[处理] 写入票圈后台成功,视频id:{code_vid}")
  57. tag = f"lev-供给,rol-机器,#str-内容理解优化标题_51,{video_id}"
  58. PQ.video_tag(code_vid,tag)
  59. logger.info(f"[处理] 视频写入标签成功")
  60. current_time = datetime.now()
  61. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  62. logger.info(f"[处理] 开始写入飞书表格")
  63. values = [[video_id,code_vid,new_title,task['channel'],task["time"],task["partition"],formatted_time]]
  64. Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "ROWS", 1, 2)
  65. time.sleep(0.5)
  66. Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "A2:Z2", values)
  67. logger.info(f"[处理] 写入飞书表格成功")
  68. async def run():
  69. scheduler = AsyncIOScheduler()
  70. try:
  71. scheduler.add_job(ConsumptionRecommend.run, trigger=CronTrigger(minute=2, second=0)) # 每小时获取一次
  72. scheduler.start()
  73. await asyncio.Event().wait()
  74. except KeyboardInterrupt:
  75. pass
  76. except Exception as e:
  77. pass
  78. finally:
  79. scheduler.shutdown()
  80. if __name__ == '__main__':
  81. # asyncio.run(ConsumptionRecommend.run())
  82. loop = asyncio.get_event_loop()
  83. loop.run_until_complete(run())