import asyncio import time from datetime import datetime import orjson from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from loguru import logger from utils.feishu_utils import Feishu from utils.google_ai_studio import GoogleAI from utils.gpt4o_mimi import GPT4oMini from utils.piaoquan import PQ from utils.redis import RedisHelper class ConsumptionRecommend(object): @classmethod async def run(cls): logger.info(f"[处理] 开始获取redis数据") task = await RedisHelper().get_client().rpop(name = 'gong_ji_heng_ceng:scan_tasks') if not task: logger.info('[处理] 无待执行的扫描任务') return task = orjson.loads(task) logger.info(f"[处理] 获取redis数据{task}") video_id = task['video_id'] channel = task['channel'] logger.info(f"[处理] 开始获取原视频OSS地址") video_path, cover_path = PQ.get_pq_oss(video_id) if not video_path: return logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path},封面链接:{cover_path}") video_url = f"http://rescdn.yishihui.com/{video_path}" if channel == "快手品类账号": api_key = 'AIzaSyCTFPsbSfESF0Xybm8_qz7st_SH5E7wsdg' elif channel == "抖音品类账号": api_key = 'AIzaSyAJ8kUcEXRu37SuNx2w5qllaowMcUoPhoU' elif channel == "抖音关键词抓取": api_key = 'AIzaSyC-2Es4bk1uE-6u3lW5AOQuGqXWNzb92eQ' elif channel == "快手关键词抓取": api_key = 'AIzaSyD6R8tIOO11yh6WOXVQMBA2wzSZiREGUrA' else: api_key = 'AIzaSyAwGqthDADh5NPVe3BMcOJBQkJaf0HWBuQ' logger.info(f"[处理] 开始提取口播文案") text = GoogleAI.run(api_key, video_url) if not text: logger.error(f"[处理] 提取口播文案失败") return if "视频下载失败" == text: logger.error(f"[处理] 视频下载失败") return logger.info(f"[处理] 提取口播文案完成") logger.info(f"[处理] 口播文案通过gpt开始生成标题") new_title = GPT4oMini.get_ai_mini_title(text) if not new_title: logger.error(f"[处理] 口播文案通过gpt无法生成标题") return logger.info(f"[处理] 口播文案通过gpt生成标题完成,{new_title}") n_id = "78354423" logger.info(f"[处理] 开始写入票圈后台") code_vid = PQ.install_tj_pq(video_id, video_path, new_title, n_id, cover_path) if not code_vid: logger.error(f"[处理] 写入票圈后台失败") return logger.info(f"[处理] 写入票圈后台成功,视频id:{code_vid}") tag = f"lev-供给,rol-机器,#str-内容理解优化标题_51,{video_id}" PQ.video_tag(code_vid,tag) logger.info(f"[处理] 视频写入标签成功") current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") logger.info(f"[处理] 开始写入飞书表格") values = [[video_id,code_vid,new_title,channel,task["time"],task["partition"],formatted_time]] Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "qFpmD4", "A2:Z2", values) logger.info(f"[处理] 写入飞书表格成功") async def run(): scheduler = AsyncIOScheduler() try: scheduler.add_job(ConsumptionRecommend.run, trigger=CronTrigger(minute=2, second=0)) # 每小时获取一次 scheduler.start() await asyncio.Event().wait() except KeyboardInterrupt: pass except Exception as e: pass finally: scheduler.shutdown() if __name__ == '__main__': # asyncio.run(ConsumptionRecommend.run()) loop = asyncio.get_event_loop() loop.run_until_complete(run())