Browse Source

feat: 多trigger版本视频理解

jihuaqiang 3 weeks ago
parent
commit
30066e7f5a

+ 1 - 1
docker-compose.yml

@@ -22,7 +22,7 @@ services:
     networks:
       - video_insight_net
     deploy:
-      replicas: 10
+      replicas: 6
     entrypoint: sh /app/start.sh
 networks:
   video_insight_net:

+ 2 - 2
start.sh

@@ -6,6 +6,6 @@ export CONTAINER_INDEX="$(echo "$CONTAINER_INFO" | jq '.Name' | sed 's/^"\(.*\)"
 echo "export VIDEO_INSIGHT_GEMINI_API_KEY=$(eval echo \$"VIDEO_INSIGHT_GEMINI_API_KEY_${CONTAINER_INDEX}")" >> /root/.bashrc
 . /root/.bashrc
 
-python /app/workers/video_insight_consumption_work.py
-# python /app/workers/video_insight_trigger_work.py
+# python /app/workers/video_insight_consumption_work.py
+python /app/workers/video_insight_trigger_work.py
 

+ 76 - 10
utils/google_ai_analyze.py

@@ -10,7 +10,7 @@ from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
 from loguru import logger
 from utils.coze_hook import CozeHook
 
-from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT
+from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT, VIDEO_ANALYSIS_PROMPT
 # from utils.feishu_data import Material
 
 CACHE_DIR = os.path.join(os.getcwd(), 'video_cache')
@@ -25,13 +25,17 @@ def load_prompts():
         print("\n[初始化] 从prompt.py加载Prompt")
         
         prompts = [
+            # {
+            #     "name": "视频选题与要点理解",
+            #     "content": VIDEO_TOPIC_ANALYSIS_PROMPT
+            # },
+            # {
+            #     "name": "视频分段与时间点分析",
+            #     "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
+            # }
             {
-                "name": "视频选题与要点理解",
-                "content": VIDEO_TOPIC_ANALYSIS_PROMPT
-            },
-            {
-                "name": "视频分段与时间点分析",
-                "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
+                "name": "视频内容分析",
+                "content": VIDEO_ANALYSIS_PROMPT
             }
         ]
             
@@ -154,7 +158,15 @@ class GoogleAI(object):
                 print(f"[分析] 正在执行: {prompt['name']}")
                 try:
                     result = cls._analyze_content(video, prompt['content'])
-                    analysis_data[prompt['name']] = result
+                    # 提取 result 中的 "内容分段" 和 "视频简介"
+                    analysis_data['视频选题与要点理解'] = {
+                        "视频简介": result.get('视频简介', ''),
+                        "视频内容类型": result.get('视频内容类型', ''),
+                        "段落类型相似度": result.get('段落类型相似度', 1)
+                    }
+                    analysis_data['视频分段与时间点分析'] = {
+                        "内容分段": result.get('内容分段', [])
+                    }
                 except Exception as e:
                     analysis_data[prompt['name']] = {
                         "error": str(e),
@@ -173,10 +185,64 @@ class GoogleAI(object):
             logger.error(f"[内容分析] 处理异常,异常信息{e}")
             os.remove(video_path)
             return f"[异常] {e}",""
+        
+    @classmethod
+    def _analyze_content_with_api(cls, video_url):
+        """使用API分析视频内容"""
+        try:
+            # 使用API分析视频内容
+            response = requests.post(
+                'http://ai-api.piaoquantv.com/aigc-server/gemini/generateContent',
+                json={
+                    "mediaUrl": video_url, 
+                    "type": 2, 
+                    "prompt": VIDEO_ANALYSIS_PROMPT, 
+                    "model":"gemini-2.0-flash",
+                    "temperature":"0.3"
+                }
+            )
+            response.raise_for_status()
+            result = response.json()
+            print(f"[内容分析] API分析完成, 结果: {result}")
+            
+            if not result or result.get('code') != 0:
+                raise Exception("API分析结果异常")
+                
+            # 解析返回的JSON字符串
+            analysis_result = orjson.loads(result['data']['result'])
+            
+            # 构建analysis_data
+            analysis_data = {
+                '视频选题与要点理解': {
+                    "视频简介": analysis_result.get('视频简介', ''),
+                    "视频内容类型": analysis_result.get('视频内容类型', ''),
+                    "段落类型相似度": analysis_result.get('段落类型相似度', 1)
+                },
+                '视频分段与时间点分析': {
+                    "内容分段": analysis_result.get('内容分段', [])
+                }
+            }
+            
+            # 使用coze_hook处理数据
+            coze_hook = CozeHook()
+            demand_list = coze_hook.run(
+                analysis_data["视频选题与要点理解"], 
+                analysis_data["视频分段与时间点分析"]
+            )
 
+            print(f"[内容分析] API分析完成, 结果: {analysis_data}, {demand_list}")
+            
+            return analysis_data, demand_list
+            
+        except Exception as e:
+            logger.error(f"[内容分析] API分析失败,异常信息{e}")
+            return f"[异常] {e}", ""
+        
+    
 
 if __name__ == '__main__':
     ai = GoogleAI()
-    ai.run("AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk",
-                 "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
+    # ai.run("AIzaSyAHt9h0ScYki7NmgOXa1jj-UEimCa6JEOs",
+    #              "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
+    ai._analyze_content_with_api("http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
 

+ 46 - 0
utils/google_ai_prompt.py

@@ -1,4 +1,50 @@
 # 视频选题与要点理解
+VIDEO_ANALYSIS_PROMPT = '''
+你是一名专业的视频内容分析助手,请对以下视频内容进行结构化分析。请严格按照以下要求输出标准JSON格式(## 使用英文双引号,不要使用单引号 ##),并且仅输出JSON,不要添加任何说明。
+
+*任务要求*:
+1. 总结视频表达的主旨内容(简洁明确,反映视频核心意图);
+2. 对视频进行内容分类(如:健康科普、娱乐搞笑、历史故事、时政评论、群友祝福、罕见画面、音乐分享、知识科普、节日祝福、国家力量、生活经验分享等);
+3. 将视频按逻辑内容分段(每段应保持内容逻辑的完整性),忽略视频中“分享”,“转发”相关的诱导性内容,只提取有效内容分析;
+4. 记录每段的起始时间和结束时间(格式:00:00:00);
+5. 总结每段发生的事件、出现的人物、画面与内容进行概括描述;
+6. 解释每段分段的原因,以及为何从此处分段;
+7. 分析该段内容与视频主旨的关联性,并进行打分,关联性越高,得分越高(1-10分);
+8. 标注每段的内容分类类型(参考整体分类维度),如果该段落内容是 引导用户分享、转发、点赞、评论、收藏、关注等,则标记该段落分类为“引导分享”;
+9. 分析各段落之间在主旨或分类上的差异性,输出一个“段落一致性评分”:基础为10分,每出现一段与其他段明显不同的主旨或分类则扣1分;
+
+请根据以上格式和分析要求输出结果。
+
+*注意事项*
+1. 在任何情况下都不得将“点赞”“评论”“分享”“转发”等引导性的内容当作独立语句或段落,要彻底忽略并剔除,如果无法避免,则概括为“引导分享”类型。
+
+输出格式如下:
+{
+  "视频简介": "视频的主旨总结内容",
+  "视频内容类型": "视频整体分类",
+  "内容分段": [
+    {
+      "开始时间": "00:00:00",
+      "结束时间": "00:00:30",
+      "段落主题": "该段的主要内容",
+      "分段原因": "该段分段的原因,以及为何从此处分段",
+      "段落类型": "该段的内容分类",
+      "关联性评分": 9,
+    },
+    {
+      "开始时间": "00:00:31",
+      "结束时间": "00:01:00",
+      "段落主题": "该段的主要内容",
+      "分段原因": "该段分段的原因,以及为何从此处分段",
+      "段落类型": "该段的内容分类",
+      "关联性评分": 8,
+    }
+    // 可继续追加更多段落
+  ],
+  "段落类型相似度": 9
+}
+请根据以上格式和分析要求输出结果。
+'''
 
 VIDEO_TOPIC_ANALYSIS_PROMPT = '''# 任务说明:
 你是一位短视频结构化分析专家。请严格按以下要求处理输入视频:

+ 1 - 1
workers/video_insight_select_work.py

@@ -20,7 +20,7 @@ def requirement_insight():
         logger.info(f"视频需求点洞察")
         # redis_task = "task:video_insight"
         redis_trigger_task = "task:video_trigger_insight"
-        sql =f'select clickobjectid as video_id from loghubods.user_share_log where dt = {dt} and topic = "click" group by clickobjectid order by count(distinct machinecode) desc limit 1000'
+        sql =f'select clickobjectid as video_id from loghubods.user_share_log where dt = {dt} and topic = "click" group by clickobjectid order by count(distinct machinecode) desc limit 300'
         data = OdpsDataCount.main(sql)
         if not data:
             return

+ 7 - 8
workers/video_insight_trigger_work.py

@@ -53,10 +53,9 @@ class ConsumptionRecommend(object):
         logger.info(f"[处理 - trigger] 获取原视频OSS地址,视频链接:{video_path}")
         video_url = f"http://rescdn.yishihui.com/{video_path}"
         logger.info(f"[处理 - trigger] 开始分析视频")
-        api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
-        # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
-        logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
-        analysis_data, demand_list = GoogleAI.run(api_key, video_url)
+        # api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
+        # logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
+        analysis_data, demand_list = GoogleAI._analyze_content_with_api(video_url)
         if "[异常]" in analysis_data:
             content_video_trigger_data(json.dumps(task))
         # Parse JSON data
@@ -75,7 +74,7 @@ class ConsumptionRecommend(object):
         link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
         for index, entry in enumerate(data, 1):  # Start from 1 to match the original numbering
 
-            hook_desc = entry.get('需求钩子话术', '')
+            hook_desc = entry.get('钩子文案', '')
             result,msg = Security.security(hook_desc)
             audit_status = 0
             if result :
@@ -86,8 +85,8 @@ class ConsumptionRecommend(object):
 
             value = f"""(
                 {video_id}, '{link}', '{video_title}', NULL,
-                '{index}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}', '{entry.get('需求类型', '')}', '{entry.get('需求钩子出现时间', '')}',
-                '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求详细query', '')}', '', '{entry.get('需求钩子话术', '')}',
+                '{index}', '{entry.get('评分', '')}', '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求分类', '')}', '{entry.get('钩子类型', '')}', '{entry.get('钩子出现时间', '')}',
+                '{entry.get('推测出该点需求的原因', '')}', '{entry.get('钩子到AI大模型的问题', '')}', '', '{entry.get('钩子文案', '')}',
                 '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', '','{audit_status}','{audit_desc}'
             )"""
             values.append(value)
@@ -129,7 +128,7 @@ async def run():
     scheduler = AsyncIOScheduler()
     try:
         logger.info(f"[处理 - trigger] 开始启动")
-        scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=3))  # 每2分钟启动一次
+        scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=4))  # 每4分钟启动一次
         scheduler.start()
         await asyncio.Event().wait()
     except KeyboardInterrupt: