import asyncio import json import os import sys import orjson from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from loguru import logger logger.add("/app/logs/consumption.log", rotation="10 MB") sys.path.append('/app') from utils.aliyun_log import AliyunLogger from utils.google_ai_studio import GoogleAI from utils.piaoquan import PQ from utils.redis import RedisHelper, content_video_data from utils.mysql_db import MysqlHelper class ConsumptionRecommend(object): @classmethod async def run(cls): logger.info(f"[处理] 开始获取redis数据") while True: task = RedisHelper().get_client().rpop(name='task:video_insight') if not task: logger.info('[处理] 无待执行的扫描任务') return task = orjson.loads(task) logger.info(f"[处理] 获取redis数据{task}") video_id = task['video_id'] count_sql = f"""select count(1) from video_demand_analysis where video_id = {video_id}""" count = MysqlHelper.get_values(count_sql) if not count or count[0][0] == 0: logger.info(f"[处理] 视频ID {video_id} 可用") # 这里可以继续处理 video_id break else: logger.info(f"[处理] 视频ID {video_id} 重复过滤,继续获取下一个任务") logger.info(f"[处理] 开始获取原视频OSS地址") video_title, video_path = PQ.get_pq_oss(video_id) if not video_path: return logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}") video_url = f"http://rescdn.yishihui.com/{video_path}" logger.info(f"[处理] 开始分析视频") api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY") # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk' logger.info(f"[处理] 使用的API_KEY:{api_key}") text = GoogleAI.run(api_key, video_url) if "[异常]" in text: content_video_data(json.dumps(task)) # Parse JSON data data = json.loads(orjson.dumps(text).decode()) # Generate SQL insert statement sql = """ INSERT INTO video_demand_analysis ( video_id, video_link, video_title, content_type, demand_order, demand_score, user_demand, demand_category, demand_reason, product_hook, hook_time, hook_desc, hook_type, landing_desc, landing_type, platform_case, reasoning_process ) VALUES """ # Add values for each entry values = [] link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail""" for entry in data: value = f"""( {video_id}, '{link}', '{video_title}', NULL, {entry.get('需求排序序号', '')}, {entry.get('需求强烈程度分值', '')}, '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}', '{entry.get('推测出该点需求的原因', '')}', '{entry.get('描述出与需求对应的产品钩子', '')}', '{entry.get('产品形式出现到消失的时间点', '')}', '{entry.get('钩子形式描述', '')}', '{entry.get('钩子形式类型', '')}', '{entry.get('点击钩子后的产品落地形态描述', '')}', '{entry.get('产品落地形态分类', '')}', '{entry.get('其他平台案例', '')}, '{entry.get('输出你的推理过程', '')}' )""" values.append(value) # Combine SQL statement and values sql += ",\n".join(values) + ";" # Print SQL statement logger.info(f"{sql}") MysqlHelper.update_values(sql) # AliyunLogger.logging(str(video_id), orjson.dumps(text).decode()) logger.info(f"[处理] 写入数据库成功") async def run(): scheduler = AsyncIOScheduler() try: logger.info(f"[处理] 开始启动") scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2)) # 每2分钟启动一次 scheduler.start() await asyncio.Event().wait() except KeyboardInterrupt: pass except Exception as e: logger.error(f"[处理] 启动异常,异常信息:{e}") pass finally: scheduler.shutdown() if __name__ == '__main__': # asyncio.run(ConsumptionRecommend.run()) loop = asyncio.get_event_loop() loop.run_until_complete(run())