12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import asyncio
- import time
- from datetime import datetime
- import orjson
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.cron import CronTrigger
- from loguru import logger
- from utils.feishu_utils import Feishu
- from utils.google_ai_studio import GoogleAI
- from utils.gpt4o_mimi import GPT4oMini
- from utils.piaoquan import PQ
- from utils.redis import RedisHelper
- class ConsumptionRecommend(object):
- @classmethod
- async def run(cls):
- logger.info(f"[处理] 开始获取redis数据")
- task = await RedisHelper().get_client().rpop(name = 'gong_ji_heng_ceng:scan_tasks')
- if not task:
- logger.info('[ 改造 ] 无待执行的扫描任务')
- return
- task = orjson.loads(task)
- logger.info(f"[处理] 获取redis数据{task}")
- video_id = task['video_id']
- channel = task['channel']
- logger.info(f"[处理] 开始获取原视频OSS地址")
- video_path, cover_path = PQ.get_pq_oss(video_id)
- if not video_path:
- return
- logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path},封面链接:{cover_path}")
- video_url = f"http://rescdn.yishihui.com/{video_path}"
- if channel == "快手品类账号":
- api_key = 'AIzaSyCTFPsbSfESF0Xybm8_qz7st_SH5E7wsdg'
- elif channel == "抖音品类账号":
- api_key = 'AIzaSyAJ8kUcEXRu37SuNx2w5qllaowMcUoPhoU'
- elif channel == "抖音关键词抓取":
- api_key = 'AIzaSyC-2Es4bk1uE-6u3lW5AOQuGqXWNzb92eQ'
- elif channel == "快手关键词抓取":
- api_key = 'AIzaSyD6R8tIOO11yh6WOXVQMBA2wzSZiREGUrA'
- else:
- api_key = 'AIzaSyAwGqthDADh5NPVe3BMcOJBQkJaf0HWBuQ'
- logger.info(f"[处理] 开始提取口播文案")
- text = GoogleAI.run(api_key, video_url)
- if not text:
- logger.error(f"[处理] 提取口播文案失败")
- return
- if "视频下载失败" == text:
- logger.error(f"[处理] 视频下载失败")
- return
- logger.info(f"[处理] 提取口播文案完成")
- logger.info(f"[处理] 口播文案通过gpt开始生成标题")
- new_title = GPT4oMini.get_ai_mini_title(text)
- if not new_title:
- logger.error(f"[处理] 口播文案通过gpt无法生成标题")
- return
- logger.info(f"[处理] 口播文案通过gpt生成标题完成,{new_title}")
- n_id = "78354423"
- logger.info(f"[处理] 开始写入票圈后台")
- code_vid = PQ.install_tj_pq(video_id, video_path, new_title, n_id, cover_path)
- if not code_vid:
- logger.error(f"[处理] 写入票圈后台失败")
- return
- logger.info(f"[处理] 写入票圈后台成功,视频id:{code_vid}")
- tag = f"lev-供给,rol-机器,#str-内容理解优化标题_51,{video_id}"
- PQ.video_tag(code_vid,tag)
- logger.info(f"[处理] 视频写入标签成功")
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- logger.info(f"[处理] 开始写入飞书表格")
- values = [[video_id,code_vid,new_title,channel,task["time"],task["partition"],formatted_time]]
- Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "A2:Z2", values)
- logger.info(f"[处理] 写入飞书表格成功")
- async def run():
- scheduler = AsyncIOScheduler()
- try:
- scheduler.add_job(ConsumptionRecommend.run, trigger=CronTrigger(minute=3, second=0)) # 每小时获取一次
- scheduler.start()
- await asyncio.Event().wait()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- asyncio.run(ConsumptionRecommend.run())
- # loop = asyncio.get_event_loop()
- # loop.run_until_complete(run())
|