consumption_work.py 4.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. import orjson
  5. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  6. from apscheduler.triggers.cron import CronTrigger
  7. from loguru import logger
  8. from utils.feishu_utils import Feishu
  9. from utils.google_ai_studio import GoogleAI
  10. from utils.gpt4o_mimi import GPT4oMini
  11. from utils.piaoquan import PQ
  12. from utils.redis import RedisHelper
  13. class ConsumptionRecommend(object):
  14. @classmethod
  15. async def run(cls):
  16. logger.info(f"[处理] 开始获取redis数据")
  17. task = await RedisHelper().get_client().rpop(name = 'gong_ji_heng_ceng:scan_tasks')
  18. if not task:
  19. logger.info('[处理] 无待执行的扫描任务')
  20. return
  21. task = orjson.loads(task)
  22. logger.info(f"[处理] 获取redis数据{task}")
  23. video_id = task['video_id']
  24. channel = task['channel']
  25. logger.info(f"[处理] 开始获取原视频OSS地址")
  26. video_path, cover_path = PQ.get_pq_oss(video_id)
  27. if not video_path:
  28. return
  29. logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path},封面链接:{cover_path}")
  30. video_url = f"http://rescdn.yishihui.com/{video_path}"
  31. if channel == "快手品类账号":
  32. api_key = 'AIzaSyCTFPsbSfESF0Xybm8_qz7st_SH5E7wsdg'
  33. elif channel == "抖音品类账号":
  34. api_key = 'AIzaSyAJ8kUcEXRu37SuNx2w5qllaowMcUoPhoU'
  35. elif channel == "抖音关键词抓取":
  36. api_key = 'AIzaSyC-2Es4bk1uE-6u3lW5AOQuGqXWNzb92eQ'
  37. elif channel == "快手关键词抓取":
  38. api_key = 'AIzaSyD6R8tIOO11yh6WOXVQMBA2wzSZiREGUrA'
  39. else:
  40. api_key = 'AIzaSyAwGqthDADh5NPVe3BMcOJBQkJaf0HWBuQ'
  41. logger.info(f"[处理] 开始提取口播文案")
  42. text = GoogleAI.run(api_key, video_url)
  43. if not text:
  44. logger.error(f"[处理] 提取口播文案失败")
  45. return
  46. if "视频下载失败" == text:
  47. logger.error(f"[处理] 视频下载失败")
  48. return
  49. logger.info(f"[处理] 提取口播文案完成")
  50. logger.info(f"[处理] 口播文案通过gpt开始生成标题")
  51. new_title = GPT4oMini.get_ai_mini_title(text)
  52. if not new_title:
  53. logger.error(f"[处理] 口播文案通过gpt无法生成标题")
  54. return
  55. logger.info(f"[处理] 口播文案通过gpt生成标题完成,{new_title}")
  56. n_id = "78354423"
  57. logger.info(f"[处理] 开始写入票圈后台")
  58. code_vid = PQ.install_tj_pq(video_id, video_path, new_title, n_id, cover_path)
  59. if not code_vid:
  60. logger.error(f"[处理] 写入票圈后台失败")
  61. return
  62. logger.info(f"[处理] 写入票圈后台成功,视频id:{code_vid}")
  63. tag = f"lev-供给,rol-机器,#str-内容理解优化标题_51,{video_id}"
  64. PQ.video_tag(code_vid,tag)
  65. logger.info(f"[处理] 视频写入标签成功")
  66. current_time = datetime.now()
  67. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  68. logger.info(f"[处理] 开始写入飞书表格")
  69. values = [[video_id,code_vid,new_title,channel,task["time"],task["partition"],formatted_time]]
  70. Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "ROWS", 1, 2)
  71. time.sleep(0.5)
  72. Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "A2:Z2", values)
  73. logger.info(f"[处理] 写入飞书表格成功")
  74. async def run():
  75. scheduler = AsyncIOScheduler()
  76. try:
  77. scheduler.add_job(ConsumptionRecommend.run, trigger=CronTrigger(minute=3, second=0)) # 每小时获取一次
  78. scheduler.start()
  79. await asyncio.Event().wait()
  80. except KeyboardInterrupt:
  81. pass
  82. except Exception as e:
  83. pass
  84. finally:
  85. scheduler.shutdown()
  86. if __name__ == '__main__':
  87. # asyncio.run(ConsumptionRecommend.run())
  88. loop = asyncio.get_event_loop()
  89. loop.run_until_complete(run())