123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import asyncio
- import json
- import os
- import sys
- import orjson
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from loguru import logger
- logger.add("/app/logs/consumption.log", rotation="10 MB")
- sys.path.append('/app')
- from utils.aliyun_log import AliyunLogger
- from utils.google_ai_analyze import GoogleAI
- from utils.piaoquan import PQ
- from utils.redis import RedisHelper, content_video_trigger_data
- from utils.mysql_db import MysqlHelper
- from utils.aliyun_security import Security
- class ConsumptionRecommend(object):
- @classmethod
- async def run(cls):
- logger.info(f"[处理 - trigger] 开始获取redis数据")
- while True:
- task = RedisHelper().get_client().rpop(name='task:video_trigger_insight')
- if not task:
- logger.info('[处理] 无待执行的扫描任务')
- return
- task = orjson.loads(task)
- logger.info(f"[处理 - trigger] 获取redis数据{task}")
- video_id = task['video_id']
- count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}"""
- count = MysqlHelper.get_values(count_sql)
- if not count or count[0][0] == 0:
- logger.info(f"[处理 - trigger] 视频ID {video_id} 可用")
- # 这里可以继续处理 video_id
- break
- else:
- logger.info(f"[处理 - trigger] 视频ID {video_id} 重复过滤,继续获取下一个任务")
- logger.info(f"[处理 - trigger] 开始获取原视频OSS地址")
- video_title, video_path = PQ.get_pq_oss(video_id)
- if not video_path:
- return
- logger.info(f"[处理 - trigger] 获取原视频OSS地址,视频链接:{video_path}")
- video_url = f"http://rescdn.yishihui.com/{video_path}"
- logger.info(f"[处理 - trigger] 开始分析视频")
- # api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
- # logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
- analysis_data, demand_list = GoogleAI._analyze_content_with_api(video_url)
-
- # 检查API分析是否成功
- if isinstance(analysis_data, str) and "[异常]" in analysis_data:
- logger.error(f"[处理 - trigger] API分析失败: {analysis_data}")
- content_video_trigger_data(json.dumps(task))
- return
-
- if not analysis_data or not demand_list:
- logger.error(f"[处理 - trigger] API分析结果为空")
- content_video_trigger_data(json.dumps(task))
- return
-
- # Parse JSON data for demand list
- try:
- data = json.loads(orjson.dumps(demand_list).decode())
- if not data:
- logger.error(f"[处理 - trigger] 需求列表为空")
- content_video_trigger_data(json.dumps(task))
- return
-
- # Generate SQL insert statement for triggers analysis
- sql = """
- INSERT INTO video_triggers_analysis (
- video_id, video_link, video_title,
- trigger_order, trigger_score, trigger_reason, trigger_text, trigger_question, trigger_category, trigger_time,
- audit_status,audit_desc
- ) VALUES
- """
- # Add values for each entry
- values = []
- link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
- for index, entry in enumerate(data, 1): # Start from 1 to match the original numbering
- try:
- hook_desc = entry.get('钩子文案', '')
- result, msg = Security.security(hook_desc)
- audit_status = 0
- if result:
- audit_status = 1
- else:
- audit_status = 2
- audit_desc = msg
- value = f"""(
- {video_id}, '{link}', '{video_title.replace("'", "''")}',
- '{index}', '{entry.get('评分', '').replace("'", "''")}', '{entry.get('推测出该点需求的原因', '').replace("'", "''")}',
- '{entry.get('钩子文案', '').replace("'", "''")}', '{entry.get('钩子到AI大模型的问题', '').replace("'", "''")}',
- '{entry.get('钩子类型', '').replace("'", "''")}', '{entry.get('钩子出现时间', '').replace("'", "''")}',
- '{audit_status}','{audit_desc.replace("'", "''")}'
- )"""
- values.append(value)
- except Exception as e:
- logger.error(f"[处理 - trigger] 处理需求条目时出错: {e}")
- continue
-
- if not values:
- logger.error(f"[处理 - trigger] 没有有效的需求条目")
- content_video_trigger_data(json.dumps(task))
- return
-
- # Combine SQL statement and values
- sql += ",\n".join(values) + ";"
- # Print SQL statement
- logger.info(f"{sql}")
- MysqlHelper.update_values(sql)
- logger.info(f"[处理 - trigger] 需求列表写入数据库成功")
- # Parse JSON data for analysis summary
- try:
- # Generate SQL insert statement for demand score
- sql = """
- INSERT INTO video_demand_score (
- video_id, video_link, video_title, analysis_summary, analysis_timeline
- ) VALUES
- """
- # Add values for each entry
- values = []
- link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
-
- # 处理视频选题与要点理解
- analysis_summary = json.dumps(analysis_data.get('视频选题与要点理解', {}), ensure_ascii=False)
- analysis_timeline = json.dumps(analysis_data.get('视频分段与时间点分析', {}), ensure_ascii=False)
-
- # 使用参数化查询来避免 SQL 注入和转义问题
- value = f"""(
- {video_id},
- '{link}',
- '{video_title.replace("'", "''")}',
- '{analysis_summary.replace("'", "''")}',
- '{analysis_timeline.replace("'", "''")}'
- )"""
- values.append(value)
- # Combine SQL statement and values
- sql += ",\n".join(values) + ";"
- # Print SQL statement
- logger.info(f"{sql}")
- MysqlHelper.update_values(sql)
- logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功")
-
- except Exception as e:
- logger.error(f"[处理 - trigger] 写入视频分析结果时出错: {e}")
- content_video_trigger_data(json.dumps(task))
- return
-
- except Exception as e:
- logger.error(f"[处理 - trigger] 处理需求列表时出错: {e}")
- content_video_trigger_data(json.dumps(task))
- return
- async def run():
- scheduler = AsyncIOScheduler()
- try:
- logger.info(f"[处理 - trigger] 开始启动")
- scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=10)) # 每5分钟启动一次
- scheduler.start()
- await asyncio.Event().wait()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- logger.error(f"[处理] 启动异常,异常信息:{e}")
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- # asyncio.run(ConsumptionRecommend.run())
- loop = asyncio.get_event_loop()
- loop.run_until_complete(run())
|