consumption_work.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import asyncio
  2. import os
  3. import time
  4. from datetime import datetime
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.cron import CronTrigger
  8. from loguru import logger
  9. from utils.feishu_utils import Feishu
  10. from utils.google_ai_studio import GoogleAI
  11. from utils.gpt4o_mimi import GPT4oMini
  12. from utils.piaoquan import PQ
  13. from utils.redis import RedisHelper
  14. class ConsumptionRecommend(object):
  15. @classmethod
  16. async def run(cls):
  17. logger.info(f"[处理] 开始获取redis数据")
  18. task = await RedisHelper().get_client().rpop(name = 'gong_ji_heng_ceng:scan_tasks')
  19. if not task:
  20. logger.info('[处理] 无待执行的扫描任务')
  21. return
  22. task = orjson.loads(task)
  23. logger.info(f"[处理] 获取redis数据{task}")
  24. video_id = task['video_id']
  25. logger.info(f"[处理] 开始获取原视频OSS地址")
  26. video_path, cover_path = PQ.get_pq_oss(video_id)
  27. if not video_path:
  28. return
  29. logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path},封面链接:{cover_path}")
  30. video_url = f"http://rescdn.yishihui.com/{video_path}"
  31. logger.info(f"[处理] 开始提取口播文案")
  32. api_key = os.getenv("GEMINI_API_KEY")
  33. logger.info(f"[处理] 使用的API_KEY:{api_key}")
  34. text = GoogleAI.run(api_key, video_url)
  35. if not text:
  36. logger.error(f"[处理] 提取口播文案失败")
  37. return
  38. if "视频下载失败" == text:
  39. logger.error(f"[处理] 视频下载失败")
  40. return
  41. logger.info(f"[处理] 提取口播文案完成")
  42. logger.info(f"[处理] 口播文案通过gpt开始生成标题")
  43. new_title = GPT4oMini.get_ai_mini_title(text)
  44. if not new_title:
  45. logger.error(f"[处理] 口播文案通过gpt无法生成标题")
  46. return
  47. logger.info(f"[处理] 口播文案通过gpt生成标题完成,{new_title}")
  48. n_id = "78354423"
  49. logger.info(f"[处理] 开始写入票圈后台")
  50. code_vid = PQ.install_tj_pq(video_id, video_path, new_title, n_id, cover_path)
  51. if not code_vid:
  52. logger.error(f"[处理] 写入票圈后台失败")
  53. return
  54. logger.info(f"[处理] 写入票圈后台成功,视频id:{code_vid}")
  55. tag = f"lev-供给,rol-机器,#str-内容理解优化标题_51,{video_id}"
  56. PQ.video_tag(code_vid,tag)
  57. logger.info(f"[处理] 视频写入标签成功")
  58. current_time = datetime.now()
  59. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  60. logger.info(f"[处理] 开始写入飞书表格")
  61. values = [[video_id,code_vid,new_title,task['channel'],task["time"],task["partition"],formatted_time]]
  62. Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "ROWS", 1, 2)
  63. time.sleep(0.5)
  64. Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "A2:Z2", values)
  65. logger.info(f"[处理] 写入飞书表格成功")
  66. async def run():
  67. scheduler = AsyncIOScheduler()
  68. try:
  69. scheduler.add_job(ConsumptionRecommend.run, trigger=CronTrigger(minute=2, second=0)) # 每小时获取一次
  70. scheduler.start()
  71. await asyncio.Event().wait()
  72. except KeyboardInterrupt:
  73. pass
  74. except Exception as e:
  75. pass
  76. finally:
  77. scheduler.shutdown()
  78. if __name__ == '__main__':
  79. # asyncio.run(ConsumptionRecommend.run())
  80. loop = asyncio.get_event_loop()
  81. loop.run_until_complete(run())