import asyncio import json import os import sys import orjson from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from loguru import logger logger.add("/app/logs/consumption.log", rotation="10 MB") sys.path.append('/app') from utils.aliyun_log import AliyunLogger from utils.google_ai_analyze import GoogleAI from utils.piaoquan import PQ from utils.redis import RedisHelper, content_video_trigger_data from utils.mysql_db import MysqlHelper from utils.aliyun_security import Security class ConsumptionRecommend(object): @classmethod async def run(cls): logger.info(f"[处理 - trigger] 开始获取redis数据") while True: task = RedisHelper().get_client().rpop(name='task:video_trigger_insight') if not task: logger.info('[处理] 无待执行的扫描任务') return task = orjson.loads(task) logger.info(f"[处理 - trigger] 获取redis数据{task}") video_id = task['video_id'] count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}""" count = MysqlHelper.get_values(count_sql) if not count or count[0][0] == 0: logger.info(f"[处理 - trigger] 视频ID {video_id} 可用") # 这里可以继续处理 video_id break else: logger.info(f"[处理 - trigger] 视频ID {video_id} 重复过滤,继续获取下一个任务") logger.info(f"[处理 - trigger] 开始获取原视频OSS地址") video_title, video_path, transed_video_path = PQ.get_pq_oss(video_id) # 优先使用transed_video_path,没有则使用video_path if not transed_video_path: transed_video_path = video_path if not video_path: return logger.info(f"[处理 - trigger] 获取原视频OSS地址,视频链接:{transed_video_path}") video_url = f"http://rescdn.yishihui.com/{transed_video_path}" logger.info(f"[处理 - trigger] 开始分析视频") api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY") logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}") analysis_data, demand_list = GoogleAI.run(api_key, video_url) # 检查API分析是否成功 if isinstance(analysis_data, str) and "[异常]" in analysis_data: logger.error(f"[处理 - trigger] API分析失败: {analysis_data}") content_video_trigger_data(json.dumps(task)) return if not analysis_data or not demand_list: logger.error(f"[处理 - trigger] API分析结果为空") content_video_trigger_data(json.dumps(task)) return # Parse JSON data for demand list try: data = json.loads(orjson.dumps(demand_list).decode()) if not data: logger.error(f"[处理 - trigger] 需求列表为空") content_video_trigger_data(json.dumps(task)) return # Generate SQL insert statement for triggers analysis sql = """ INSERT INTO video_triggers_analysis ( video_id, video_link, video_title, trigger_order, trigger_score, trigger_reason, trigger_text, trigger_question, trigger_category, trigger_time, audit_status,audit_desc ) VALUES """ # Add values for each entry values = [] link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail""" for index, entry in enumerate(data, 1): # Start from 1 to match the original numbering try: hook_desc = entry.get('钩子文案', '') result, msg = Security.security(hook_desc) audit_status = 0 if result: audit_status = 1 else: audit_status = 2 audit_desc = msg value = f"""( {video_id}, '{link}', '{video_title.replace("'", "''")}', '{index}', '{entry.get('评分', '').replace("'", "''")}', '{entry.get('推测出该点需求的原因', '').replace("'", "''")}', '{entry.get('钩子文案', '').replace("'", "''")}', '{entry.get('钩子到AI大模型的问题', '').replace("'", "''")}', '{entry.get('钩子类型', '').replace("'", "''")}', '{entry.get('钩子出现时间', '').replace("'", "''")}', '{audit_status}','{audit_desc.replace("'", "''")}' )""" values.append(value) except Exception as e: logger.error(f"[处理 - trigger] 处理需求条目时出错: {e}") continue if not values: logger.error(f"[处理 - trigger] 没有有效的需求条目") content_video_trigger_data(json.dumps(task)) return # Combine SQL statement and values sql += ",\n".join(values) + ";" # Print SQL statement logger.info(f"{sql}") MysqlHelper.update_values(sql) logger.info(f"[处理 - trigger] 需求列表写入数据库成功") # Parse JSON data for analysis summary try: # Generate SQL insert statement for demand score sql = """ INSERT INTO video_demand_score ( video_id, video_link, video_title, analysis_summary, analysis_timeline ) VALUES """ # Add values for each entry values = [] link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail""" # 处理视频选题与要点理解 analysis_summary = json.dumps(analysis_data.get('视频选题与要点理解', {}), ensure_ascii=False) analysis_timeline = json.dumps(analysis_data.get('视频分段与时间点分析', {}), ensure_ascii=False) # 使用参数化查询来避免 SQL 注入和转义问题 value = f"""( {video_id}, '{link}', '{video_title.replace("'", "''")}', '{analysis_summary.replace("'", "''")}', '{analysis_timeline.replace("'", "''")}' )""" values.append(value) # Combine SQL statement and values sql += ",\n".join(values) + """ ON DUPLICATE KEY UPDATE video_link = VALUES(video_link), video_title = VALUES(video_title), analysis_summary = VALUES(analysis_summary), analysis_timeline = VALUES(analysis_timeline) ;""" # Print SQL statement # logger.info(f"{sql}") MysqlHelper.update_values(sql) # logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功") except Exception as e: logger.error(f"[处理 - trigger] 写入视频分析结果时出错: {e}") content_video_trigger_data(json.dumps(task)) return except Exception as e: logger.error(f"[处理 - trigger] 处理需求列表时出错: {e}") content_video_trigger_data(json.dumps(task)) return async def run(): scheduler = AsyncIOScheduler() try: logger.info(f"[处理 - trigger] 开始启动") scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=4)) # 每4分钟启动一次 scheduler.start() await asyncio.Event().wait() except KeyboardInterrupt: pass except Exception as e: logger.error(f"[处理] 启动异常,异常信息:{e}") pass finally: scheduler.shutdown() if __name__ == '__main__': # asyncio.run(ConsumptionRecommend.run()) loop = asyncio.get_event_loop() loop.run_until_complete(run())