123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- import asyncio
- import json
- import os
- import sys
- import orjson
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from loguru import logger
- sys.path.append('/app')
- from utils.aliyun_log import AliyunLogger
- from utils.google_ai_studio import GoogleAI
- from utils.piaoquan import PQ
- from utils.redis import RedisHelper, content_video_data
- from utils.mysql_db import MysqlHelper
- class ConsumptionRecommend(object):
- @classmethod
- async def run(cls):
- logger.info(f"[处理] 开始获取redis数据")
- task = RedisHelper().get_client().rpop(name = 'task:video_insight')
- if not task:
- logger.info('[处理] 无待执行的扫描任务')
- return
- task = orjson.loads(task)
- logger.info(f"[处理] 获取redis数据{task}")
- video_id = task['video_id']
- count_sql = f"""select count(1) from video_demand_analysis where video_id = {video_id}"""
- count = MysqlHelper.get_values(count_sql)
- if count and count[0][0] == 0:
- logger.info(f"[处理] 视频重复过滤")
- return
- logger.info(f"[处理] 开始获取原视频OSS地址")
- video_title, video_path = PQ.get_pq_oss(video_id)
- if not video_path:
- return
- logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
- video_url = f"http://rescdn.yishihui.com/{video_path}"
- logger.info(f"[处理] 开始分析视频")
- api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
- # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
- logger.info(f"[处理] 使用的API_KEY:{api_key}")
- text = GoogleAI.run(api_key, video_url)
- if "[异常]" in text:
- content_video_data(json.dumps(task))
- # Parse JSON data
- data = json.loads(orjson.dumps(text).decode())
- # Generate SQL insert statement
- sql = """
- INSERT INTO video_demand_analysis (
- video_id, video_link, video_title, content_type,
- demand_order, demand_score, user_demand, demand_category,
- demand_reason, product_hook, hook_time, hook_desc,
- hook_type, landing_desc, landing_type, platform_case
- ) VALUES
- """
- # Add values for each entry
- values = []
- link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
- for entry in data:
- value = f"""(
- {video_id}, {link}, {video_title}, NULL,
- {entry.get('需求排序序号', '')}, {entry.get('需求强烈程度分值', '')}, '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}',
- '{entry.get('推测出该点需求的原因', '')}', '{entry.get('描述出与需求对应的产品钩子', '')}', '{entry.get('产品形式出现到消失的时间点', '')}', '{entry.get('钩子形式描述', '')}',
- '{entry.get('钩子形式类型', '')}', '{entry.get('点击钩子后的产品落地形态描述', '')}', '{entry.get('产品落地形态分类', '')}', '{entry.get('其他平台案例', '')}'
- )"""
- values.append(value)
- # Combine SQL statement and values
- sql += ",\n".join(values) + ";"
- # Print SQL statement
- print(sql)
- MysqlHelper.update_values(sql)
- # AliyunLogger.logging(str(video_id), orjson.dumps(text).decode())
- logger.info(f"[处理] 写入日志成功")
- async def run():
- scheduler = AsyncIOScheduler()
- try:
- logger.info(f"[处理] 开始启动")
- scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2)) # 每2分钟启动一次
- scheduler.start()
- await asyncio.Event().wait()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- logger.error(f"[处理] 启动异常,异常信息:{e}")
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- # asyncio.run(ConsumptionRecommend.run())
- loop = asyncio.get_event_loop()
- loop.run_until_complete(run())
|