|
@@ -23,7 +23,7 @@ from utils.aliyun_security import Security
|
|
|
class ConsumptionRecommend(object):
|
|
|
@classmethod
|
|
|
async def run(cls):
|
|
|
- logger.info(f"[处理] 开始获取redis数据")
|
|
|
+ logger.info(f"[处理 - trigger] 开始获取redis数据")
|
|
|
|
|
|
while True:
|
|
|
task = RedisHelper().get_client().rpop(name='task:video_trigger_insight')
|
|
@@ -33,29 +33,29 @@ class ConsumptionRecommend(object):
|
|
|
return
|
|
|
|
|
|
task = orjson.loads(task)
|
|
|
- logger.info(f"[处理] 获取redis数据{task}")
|
|
|
+ logger.info(f"[处理 - trigger] 获取redis数据{task}")
|
|
|
video_id = task['video_id']
|
|
|
|
|
|
count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}"""
|
|
|
count = MysqlHelper.get_values(count_sql)
|
|
|
|
|
|
if not count or count[0][0] == 0:
|
|
|
- logger.info(f"[处理] 视频ID {video_id} 可用")
|
|
|
+ logger.info(f"[处理 - trigger] 视频ID {video_id} 可用")
|
|
|
# 这里可以继续处理 video_id
|
|
|
break
|
|
|
else:
|
|
|
- logger.info(f"[处理] 视频ID {video_id} 重复过滤,继续获取下一个任务")
|
|
|
+ logger.info(f"[处理 - trigger] 视频ID {video_id} 重复过滤,继续获取下一个任务")
|
|
|
|
|
|
- logger.info(f"[处理] 开始获取原视频OSS地址")
|
|
|
+ logger.info(f"[处理 - trigger] 开始获取原视频OSS地址")
|
|
|
video_title, video_path = PQ.get_pq_oss(video_id)
|
|
|
if not video_path:
|
|
|
return
|
|
|
- logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
|
|
|
+ logger.info(f"[处理 - trigger] 获取原视频OSS地址,视频链接:{video_path}")
|
|
|
video_url = f"http://rescdn.yishihui.com/{video_path}"
|
|
|
- logger.info(f"[处理] 开始分析视频")
|
|
|
+ logger.info(f"[处理 - trigger] 开始分析视频")
|
|
|
api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
|
|
|
# api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
|
|
|
- logger.info(f"[处理] 使用的API_KEY:{api_key}")
|
|
|
+ logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
|
|
|
analysis_data, demand_list = GoogleAI.run(api_key, video_url)
|
|
|
if "[异常]" in analysis_data:
|
|
|
content_video_trigger_data(json.dumps(task))
|
|
@@ -97,7 +97,7 @@ class ConsumptionRecommend(object):
|
|
|
logger.info(f"{sql}")
|
|
|
MysqlHelper.update_values(sql)
|
|
|
|
|
|
- logger.info(f"[处理] 需求列表写入数据库成功")
|
|
|
+ logger.info(f"[处理 - trigger] 需求列表写入数据库成功")
|
|
|
|
|
|
|
|
|
# Parse JSON data
|
|
@@ -122,13 +122,13 @@ class ConsumptionRecommend(object):
|
|
|
logger.info(f"{sql}")
|
|
|
MysqlHelper.update_values(sql)
|
|
|
|
|
|
- logger.info(f"[处理] 视频分析结果写入数据库成功")
|
|
|
+ logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功")
|
|
|
|
|
|
|
|
|
async def run():
|
|
|
scheduler = AsyncIOScheduler()
|
|
|
try:
|
|
|
- logger.info(f"[处理] 开始启动")
|
|
|
+ logger.info(f"[处理 - trigger] 开始启动")
|
|
|
scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=3)) # 每2分钟启动一次
|
|
|
scheduler.start()
|
|
|
await asyncio.Event().wait()
|