jihuaqiang 1 hónapja
szülő
commit
32ef8c5693

BIN
__pycache__/视频理解任务.cpython-313.pyc


+ 87 - 0
testpy/test.py

@@ -0,0 +1,87 @@
+# 读取 文件/视频分析报告.xlsx文件,逐行将第5列数据的JSON字符串读取出来,提取segments字段并处理
+
+import pandas as pd
+import json
+import logging
+import re
+
+# Set up logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def clean_json_string(json_str):
+    """清理和格式化JSON字符串"""
+    # 移除BOM和空白字符
+    json_str = json_str.strip().lstrip('\ufeff')
+    
+    # 处理Python风格的字符串(单引号转双引号)
+    json_str = re.sub(r'(?<!\\)"([^"]*?)(?<!\\)"', r'__DOUBLE_QUOTED__\1__END__', json_str)
+    json_str = json_str.replace("'", '"')
+    json_str = re.sub(r'__DOUBLE_QUOTED__([^_]*?)__END__', r'"\1"', json_str)
+    
+    # 确保是有效的JSON数组或对象
+    if not (json_str.startswith('[') or json_str.startswith('{')):
+        json_str = '[' + json_str
+    if not (json_str.endswith(']') or json_str.endswith('}')):
+        json_str = json_str + ']'
+    
+    return json_str
+
+try:
+    # 读取 Excel 文件
+    input_file = '文件/视频分析报告.xlsx'
+    output_file = '文件/视频分析报告_new.xlsx'
+    
+    logger.info(f"Reading file: {input_file}")
+    df = pd.read_excel(input_file)
+    
+    # 获取第5列的列名
+    column_5 = df.columns[4]  # 0-based index for 5th column
+    
+    # 逐行处理数据
+    for index, row in df.iterrows():
+        try:
+            json_str = str(row[column_5])
+            if not json_str or json_str == 'nan':
+                logger.warning(f"Empty or invalid JSON string at row {index + 1}")
+                continue
+            
+            # 清理和格式化JSON字符串
+            json_str = clean_json_string(json_str)
+            
+            # 解析JSON数据
+            try:
+                data = json.loads(json_str)
+                # 如果数据是字典,尝试获取segments字段
+                if isinstance(data, dict):
+                    segments = data.get('segments', [])
+                    remaining_data = {k: v for k, v in data.items() if k != 'segments'}
+                # 如果数据是列表,直接使用
+                elif isinstance(data, list):
+                    segments = data
+                    remaining_data = {}
+                else:
+                    logger.warning(f"Unexpected data type at row {index + 1}: {type(data)}")
+                    continue
+                
+                # 更新数据
+                df.loc[index, '第7列'] = json.dumps(segments, ensure_ascii=False)
+                df.loc[index, '第6列'] = json.dumps(remaining_data, ensure_ascii=False)
+                
+            except json.JSONDecodeError as je:
+                logger.error(f"JSON decode error at row {index + 1}: {str(je)}")
+                logger.error(f"Error position: {je.pos}")
+                logger.error(f"Character at error: {repr(json_str[je.pos:je.pos+10])}")
+                continue
+            
+        except Exception as e:
+            logger.error(f"Error processing row {index + 1}: {str(e)}")
+            continue
+    
+    # 保存到新的Excel文件
+    logger.info(f"Saving to: {output_file}")
+    df.to_excel(output_file, index=False)
+    logger.info("Processing completed successfully")
+    
+except Exception as e:
+    logger.error(f"An error occurred: {str(e)}") 

BIN
文件/~$视频分析报告_new.xlsx


BIN
文件/视频分析报告_new.xlsx


+ 146 - 144
获取trigger任务.py

@@ -99,7 +99,7 @@ def call_coze_api(summary, timeline):
         "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
     }
     payload = {
-        "workflow_id": "7507245138873450535",
+        "workflow_id": "7506810742659579904",
         "parameters": {
             "summary": summary,
             "timeline": timeline
@@ -110,165 +110,167 @@ def call_coze_api(summary, timeline):
     return response.json()
 
 def extract_fields_from_response(resp):
+    """
+    从响应中提取字段数据
+    Args:
+        resp: API响应字典,可能包含多层嵌套的JSON字符串
+    Returns:
+        tuple: (time_str, theme_str, trigger_str) 三个字符串,分别包含时间、主题和触发语
+    """
+    import json
     import re
+    import logging
     
-    # Define patterns at the function level
-    JSON_PATTERNS = [
-        r"```json\\n(.*?)```",  # 转义的换行
-        r"```json\n(.*?)```",   # 普通换行
-        r"```(.*?)```",         # 无语言标记
-        r"\{.*\}"               # 直接JSON对象
-    ]
+    logger = logging.getLogger(__name__)
     
-    def try_unescape_json_string(s):
-        # 递归反序列化所有层级的转义JSON字符串
-        for _ in range(3):  # 最多尝试3层
-            if isinstance(s, str):
-                try:
-                    s2 = json.loads(s)
-                    # 如果反序列化后类型有变化,继续递归
-                    if type(s2) != str:
-                        s = s2
-                    else:
-                        break
-                except Exception as e:
-                    print(f"JSON反序列化失败: {str(e)}")
-                    break
-            else:
-                break
-        return s
-
-    def extract_json_from_string(s):
-        """Helper function to extract and parse JSON from a string"""
-        if not isinstance(s, str):
-            return s
-            
-        # First try direct JSON parsing
+    def extract_json_from_markdown(markdown_str):
+        """从markdown代码块中提取JSON字符串"""
+        # 匹配 ```json 和 ``` 之间的内容
+        match = re.search(r'```(?:json)?\s*([\s\S]*?)```', markdown_str)
+        if match:
+            return match.group(1).strip()
+        return markdown_str.strip()
+    
+    def parse_nested_json(json_str):
+        """递归解析嵌套的JSON字符串"""
         try:
-            return json.loads(s)
+            # 尝试直接解析
+            return json.loads(json_str)
         except json.JSONDecodeError:
-            pass
-            
-        # Then try each pattern
-        for pattern in JSON_PATTERNS:
-            json_str = re.search(pattern, s, re.DOTALL)
-            if json_str:
-                try:
-                    content = json_str.group(1)
-                    return json.loads(content)
-                except Exception as e:
-                    print(f"使用模式 {pattern} 解析失败: {str(e)}")
-                    continue
-        return s
-
+            try:
+                # 如果失败,尝试解析转义的JSON字符串
+                unescaped = json_str.encode().decode('unicode_escape')
+                return json.loads(unescaped)
+            except:
+                # 如果还是失败,返回原始字符串
+                return json_str
+    
     try:
-        data = resp.get("data")
-        if not data:
-            print("响应中没有data字段")
-            return ("", "", "")
-            
+        # 处理多层嵌套的JSON
+        current_data = resp
+        while isinstance(current_data, (str, dict)):
+            if isinstance(current_data, dict):
+                # 如果是字典,尝试获取data或output字段
+                if 'data' in current_data:
+                    current_data = current_data['data']
+                elif 'output' in current_data:
+                    current_data = current_data['output']
+                else:
+                    break
+            elif isinstance(current_data, str):
+                # 如果是字符串,尝试解析JSON
+                try:
+                    parsed = parse_nested_json(current_data)
+                    if parsed == current_data:  # 如果解析结果和输入相同,说明不是JSON
+                        break
+                    current_data = parsed
+                except:
+                    break
         
-        # First parse the outer JSON structure
-        try:
-            data = json.loads(data)
-        except json.JSONDecodeError as e:
-            print(f"解析外层data失败: {str(e)}")
-            return ("", "", "")
-            
-        # Then handle the output field
-        output = data.get("output")
-        if not output:
-            print("data中没有output字段")
-            return ("", "", "")
-            
-        print(f"\n原始output字段: {output}")
-        output = extract_json_from_string(output)
+        # 如果最终结果是字符串,尝试从markdown中提取
+        if isinstance(current_data, str):
+            current_data = extract_json_from_markdown(current_data)
+            current_data = parse_nested_json(current_data)
         
-        if isinstance(output, str):
-            print("output解析后仍为字符串")
-            return ("", "", "")
-            
-
-        if isinstance(output, dict):
-            # 按优先级检查不同的字段名
-            if "需求列表" in output:
-                demand_list = output["需求列表"]
-            elif "questions" in output:
-                demand_list = output["questions"]
-            elif "interactive_questions" in output:
-                demand_list = output["interactive_questions"]
-            else:
-                print("output中没有找到需求列表、questions或interactive_questions字段")
-                return ("", "", "")
-        else:
-            demand_list = output
-
-        if not demand_list or not isinstance(demand_list, list):
-            print(f"需求列表无效: {demand_list}")
-            return ("", "", "")
-
-        times = []
-        queries = []
-        hooks = []
+        logger.info(f"Final parsed data type: {type(current_data)}")
+        logger.info(f"Final data preview: {str(current_data)[:200]}...")
         
-        for item in demand_list:
+        # 确保数据是列表
+        if not isinstance(current_data, list):
+            logger.warning(f"Converting non-list data to list: {type(current_data)}")
+            current_data = [current_data]
+        
+        # 提取并验证每个对象
+        time_list = []
+        theme_list = []
+        trigger_list = []
+        
+        for item in current_data:
             if not isinstance(item, dict):
-                print(f"跳过非字典项: {item}")
+                logger.warning(f"Skipping invalid item: {item}")
                 continue
-                
-            time = item.get("需求钩子出现时间", "")
-            query = item.get("需求详细query", "")
-            hook = item.get("需求钩子话术", "")
             
-            if not all([time, query, hook]):
-                print(f"跳过不完整项: {item}")
-                continue
-                
-            # Handle time format
-            if time == "end":
-                time = "视频结束"
-            elif "-" in time:
-                time = time.split("-")[0]  # Take the start time
+            try:
+                # 使用get方法安全地获取值,并提供默认值
+                time = str(item.get('time', '')).strip()
+                theme = str(item.get('theme', '')).strip()
+                trigger = str(item.get('trigger', '')).strip()
                 
-            times.append(time)
-            queries.append(query)
-            hooks.append(hook)
-
-        if not times:
-            print("没有提取到有效的需求项")
-            return ("", "", "")
-
-        return ("\n".join(times), "\n".join(queries), "\n".join(hooks))
-
+                if time or theme or trigger:  # 只添加非空值
+                    time_list.append(time)
+                    theme_list.append(theme)
+                    trigger_list.append(trigger)
+            except Exception as e:
+                logger.warning(f"Error extracting fields from item: {e}")
+                continue
+        
+        # 将列表转换为字符串,用换行符连接
+        time_str = '\n'.join(time_list) if time_list else ''
+        theme_str = '\n'.join(theme_list) if theme_list else ''
+        trigger_str = '\n'.join(trigger_list) if trigger_list else ''
+        
+        logger.info(f"Extracted - Time: {time_str[:50]}..., Theme: {theme_str[:50]}..., Trigger: {trigger_str[:50]}...")
+        return time_str, theme_str, trigger_str
+        
+    except json.JSONDecodeError as je:
+        logger.error(f"JSON decode error: {str(je)}")
+        logger.error(f"Error position: {je.pos}")
+        logger.error(f"Character at error: {repr(str(current_data)[je.pos:je.pos+10])}")
+        return '', '', ''
     except Exception as e:
-        print(f"解析返回数据出错: {str(e)}")
-        print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}")
-    return ("", "", "")
+        logger.error(f"Error processing response: {str(e)}")
+        logger.error(f"Response data: {str(resp)[:200]}...")
+        return '', '', ''
 
 def process_excel(input_excel, output_excel=None):
-    df = pd.read_excel(input_excel)
-    for idx, row in df.iterrows():
-        summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}"
-        timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}"
-        try:
-            print(f"开始处理第{idx+1}行")
-            resp = call_coze_api(summary, timeline)
-            time, query, hook = extract_fields_from_response(resp)
-            df.iat[idx, 9] = time    # 第8列
-            df.iat[idx, 10] = query   # 第9列
-            df.iat[idx, 11] = hook    # 第10列
-            print(f"第{idx+1}行处理完成")
-            print(hook)
-        except Exception as e:
-            print(f"第{idx+1}行处理失败: {e}")
-            df.iat[idx, 9] = "error"
-            df.iat[idx, 10] = "error"
-            df.iat[idx, 11] = "error"
-        # 每处理一行就保存一次,防止中断丢失
-        df.to_excel(input_excel, index=False)
-        print(f"已保存到第{idx+1}行")
-    df.to_excel(input_excel, index=False)
-    print("全部处理完成,已保存。")
+    """
+    处理Excel文件,调用API并更新数据
+    """
+    try:
+        df = pd.read_excel(input_excel)
+        
+        # 确保目标列存在且为字符串类型
+        for col in [8, 9, 10]:  # 对应第9、10、11列
+            if col >= len(df.columns):
+                df.insert(col, f'Column_{col}', '')
+            df.iloc[:, col] = df.iloc[:, col].fillna('').astype(str)
+        
+        for idx, row in df.iterrows():
+            try:
+                print(f"\n开始处理第{idx+1}行")
+                summary = str(row.iloc[5]) if pd.notna(row.iloc[5]) else "{}"
+                timeline = str(row.iloc[6]) if pd.notna(row.iloc[6]) else "{}"
+                
+                print(f"Summary: {summary[:100]}...")  # 只打印前100个字符
+                print(f"Timeline: {timeline[:100]}...")
+                
+                resp = call_coze_api(summary, timeline)
+                print(f"API Response: {json.dumps(resp, ensure_ascii=False)[:200]}...")  # 打印API响应
+                
+                time, theme, hook = extract_fields_from_response(resp)
+                
+                # 更新数据 - 使用单个值赋值而不是批量赋值
+                df.at[idx, df.columns[7]] = time
+                df.at[idx, df.columns[8]] = hook
+                
+                print(f"第{idx+1}行处理完成")
+                print(f"Time: {time[:100]}...")
+                print(f"Hook: {hook[:100]}...")
+                
+            except Exception as e:
+                print(f"第{idx+1}行处理失败: {str(e)}")
+                df.at[idx, df.columns[7]] = "error"
+                df.at[idx, df.columns[8]] = "error"
+            
+            # 每处理一行就保存一次,防止中断丢失
+            df.to_excel(input_excel, index=False)
+            print(f"已保存到第{idx+1}行")
+            
+        print("全部处理完成,已保存。")
+        
+    except Exception as e:
+        print(f"处理Excel文件时发生错误: {str(e)}")
+        raise
 
 # 读取视频分析报告1.xlsx,找出第10列为空的行,重新调用coze工作流分析
 def process_empty_rows(input_excel, output_excel=None):