jihuaqiang há 1 mês atrás
pai
commit
c3506f4e21
1 ficheiros alterados com 106 adições e 75 exclusões
  1. 106 75
      workers/video_insight_trigger_work.py

+ 106 - 75
workers/video_insight_trigger_work.py

@@ -56,82 +56,113 @@ class ConsumptionRecommend(object):
         # api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
         # logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
         analysis_data, demand_list = GoogleAI._analyze_content_with_api(video_url)
-        if "[异常]" in analysis_data:
-            content_video_trigger_data(json.dumps(task))
-        # Parse JSON data
-        data = json.loads(orjson.dumps(demand_list).decode())
-        # Generate SQL insert statement
-        sql = """
-        INSERT INTO video_triggers_analysis (
-            video_id, video_link, video_title, content_type,
-            demand_order, demand_score, user_demand, demand_category, demand_type, demand_show_time,
-            demand_reason, product_hook, hook_time, hook_desc,
-            hook_type, landing_desc, landing_type, platform_case, reasoning_process,audit_status,audit_desc
-        ) VALUES
-        """
-        # Add values for each entry
-        values = []
-        link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
-        for index, entry in enumerate(data, 1):  # Start from 1 to match the original numbering
-
-            hook_desc = entry.get('钩子文案', '')
-            result,msg = Security.security(hook_desc)
-            audit_status = 0
-            if result :
-                audit_status = 1
-            else :
-                audit_status = 2
-            audit_desc = msg
-
-            value = f"""(
-                {video_id}, '{link}', '{video_title}', NULL,
-                '{index}', '{entry.get('评分', '')}', '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求分类', '')}', '{entry.get('钩子类型', '')}', '{entry.get('钩子出现时间', '')}',
-                '{entry.get('推测出该点需求的原因', '')}', '{entry.get('钩子到AI大模型的问题', '')}', '', '{entry.get('钩子文案', '')}',
-                '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', '','{audit_status}','{audit_desc}'
-            )"""
-            values.append(value)
-        # Combine SQL statement and values
-        sql += ",\n".join(values) + ";"
-        # Print SQL statement
-        logger.info(f"{sql}")
-        MysqlHelper.update_values(sql)
-
-        logger.info(f"[处理 - trigger] 需求列表写入数据库成功")
-
-
-        # Parse JSON data
-        data = json.loads(orjson.dumps(analysis_data).decode())
-        # Generate SQL insert statement
-        sql = """
-        INSERT INTO video_demand_score (
-            video_id, video_link, video_title, analysis_summary, analysis_timeline
-        ) VALUES
-        """
-        # Add values for each entry
-        values = []
-        link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
-        entry = data
-        
-        # 处理视频选题与要点理解
-        analysis_summary = json.dumps(entry.get('视频选题与要点理解', {}), ensure_ascii=False)
-        analysis_timeline = json.dumps(entry.get('视频分段与时间点分析', {}), ensure_ascii=False)
         
-        # 使用参数化查询来避免 SQL 注入和转义问题
-        value = f"""(
-            {video_id}, 
-            '{link}', 
-            '{video_title.replace("'", "''")}', 
-            '{analysis_summary.replace("'", "''")}', 
-            '{analysis_timeline.replace("'", "''")}'
-        )"""
-        values.append(value)
-        # Combine SQL statement and values
-        sql += ",\n".join(values) + ";"
-        # Print SQL statement
-        logger.info(f"{sql}")
-        MysqlHelper.update_values(sql)
-
-        logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功")
+        # 检查API分析是否成功
+        if isinstance(analysis_data, str) and "[异常]" in analysis_data:
+            logger.error(f"[处理 - trigger] API分析失败: {analysis_data}")
+            content_video_trigger_data(json.dumps(task))
+            return
+            
+        if not analysis_data or not demand_list:
+            logger.error(f"[处理 - trigger] API分析结果为空")
+            content_video_trigger_data(json.dumps(task))
+            return
+            
+        # Parse JSON data for demand list
+        try:
+            data = json.loads(orjson.dumps(demand_list).decode())
+            if not data:
+                logger.error(f"[处理 - trigger] 需求列表为空")
+                content_video_trigger_data(json.dumps(task))
+                return
+                
+            # Generate SQL insert statement for triggers analysis
+            sql = """
+            INSERT INTO video_triggers_analysis (
+                video_id, video_link, video_title, content_type,
+                demand_order, demand_score, user_demand, demand_category, demand_type, demand_show_time,
+                demand_reason, product_hook, hook_time, hook_desc,
+                hook_type, landing_desc, landing_type, platform_case, reasoning_process,audit_status,audit_desc
+            ) VALUES
+            """
+            # Add values for each entry
+            values = []
+            link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
+            for index, entry in enumerate(data, 1):  # Start from 1 to match the original numbering
+                try:
+                    hook_desc = entry.get('钩子文案', '')
+                    result, msg = Security.security(hook_desc)
+                    audit_status = 0
+                    if result:
+                        audit_status = 1
+                    else:
+                        audit_status = 2
+                    audit_desc = msg
+
+                    value = f"""(
+                        {video_id}, '{link}', '{video_title.replace("'", "''")}', NULL,
+                        '{index}', '{entry.get('评分', '').replace("'", "''")}', '{entry.get('推测出该点需求的原因', '').replace("'", "''")}', '{entry.get('需求分类', '').replace("'", "''")}', '{entry.get('钩子类型', '').replace("'", "''")}', '{entry.get('钩子出现时间', '').replace("'", "''")}',
+                        '{entry.get('推测出该点需求的原因', '').replace("'", "''")}', '{entry.get('钩子到AI大模型的问题', '').replace("'", "''")}', '', '{entry.get('钩子文案', '').replace("'", "''")}',
+                        '', '{entry.get('落地方案形态描述', '').replace("'", "''")}', '{entry.get('落地方案类型', '').replace("'", "''")}', '', '','{audit_status}','{audit_desc.replace("'", "''")}'
+                    )"""
+                    values.append(value)
+                except Exception as e:
+                    logger.error(f"[处理 - trigger] 处理需求条目时出错: {e}")
+                    continue
+                    
+            if not values:
+                logger.error(f"[处理 - trigger] 没有有效的需求条目")
+                content_video_trigger_data(json.dumps(task))
+                return
+                
+            # Combine SQL statement and values
+            sql += ",\n".join(values) + ";"
+            # Print SQL statement
+            logger.info(f"{sql}")
+            MysqlHelper.update_values(sql)
+            logger.info(f"[处理 - trigger] 需求列表写入数据库成功")
+
+            # Parse JSON data for analysis summary
+            try:
+                # Generate SQL insert statement for demand score
+                sql = """
+                INSERT INTO video_demand_score (
+                    video_id, video_link, video_title, analysis_summary, analysis_timeline
+                ) VALUES
+                """
+                # Add values for each entry
+                values = []
+                link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
+                
+                # 处理视频选题与要点理解
+                analysis_summary = json.dumps(analysis_data.get('视频选题与要点理解', {}), ensure_ascii=False)
+                analysis_timeline = json.dumps(analysis_data.get('视频分段与时间点分析', {}), ensure_ascii=False)
+                
+                # 使用参数化查询来避免 SQL 注入和转义问题
+                value = f"""(
+                    {video_id}, 
+                    '{link}', 
+                    '{video_title.replace("'", "''")}', 
+                    '{analysis_summary.replace("'", "''")}', 
+                    '{analysis_timeline.replace("'", "''")}'
+                )"""
+                values.append(value)
+                # Combine SQL statement and values
+                sql += ",\n".join(values) + ";"
+                # Print SQL statement
+                logger.info(f"{sql}")
+                MysqlHelper.update_values(sql)
+                logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功")
+                
+            except Exception as e:
+                logger.error(f"[处理 - trigger] 写入视频分析结果时出错: {e}")
+                content_video_trigger_data(json.dumps(task))
+                return
+                
+        except Exception as e:
+            logger.error(f"[处理 - trigger] 处理需求列表时出错: {e}")
+            content_video_trigger_data(json.dumps(task))
+            return
 
 
 async def run():