# 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表 import pandas as pd import orjson import os from analyze_prompt import GoogleVideoAnalyzer from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2 import requests import json from datetime import datetime import re def get_first_row_and_call_coze(): import json import requests from datetime import datetime try: # 读取Excel文件 print(f"[{datetime.now()}] 开始读取Excel文件...") df = pd.read_excel('视频分析报告1.xlsx') # 获取第一行数据 first_row = df.iloc[0] # 获取第6、7、8列的数据(对应F、G、H列) summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}" timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}" video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else "" # 准备数据 - 确保数据格式正确 data = { "summary": summary, "timeline": timeline } # 调用Coze工作流 url = "https://api.coze.cn/v1/workflow/run" headers = { "Content-Type": "application/json", "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB" } payload = { "workflow_id": "7507245138873450535", "parameters": data } print(f"[{datetime.now()}] 开始调用Coze工作流...") print(f"请求URL: {url}") print(f"请求头: {headers}") print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}") response = requests.post(url, json=payload, headers=headers, timeout=60) # 打印响应状态和头信息 print(f"\n响应状态码: {response.status_code}") print(f"响应头: {dict(response.headers)}") # 尝试获取响应内容 try: response_text = response.text print(f"响应内容: {response_text}") # 检查响应内容是否为空 if not response_text.strip(): print("警告: 响应内容为空") return None # 尝试解析JSON try: result = response.json() print(f"\n[{datetime.now()}] Coze工作流返回结果:") print(json.dumps(result, ensure_ascii=False, indent=2)) return result except json.JSONDecodeError as je: print(f"JSON解析错误: {str(je)}") print(f"原始响应内容: {response_text}") return None except Exception as e: print(f"读取响应内容时出错: {str(e)}") return None except pd.errors.EmptyDataError: print("错误: Excel文件为空") except requests.exceptions.RequestException as e: print(f"请求错误: {str(e)}") if hasattr(e, 'response') and e.response is not None: print(f"错误响应状态码: {e.response.status_code}") print(f"错误响应内容: {e.response.text}") except Exception as e: print(f"发生未知错误: {str(e)}") return None def call_coze_api(summary, timeline): url = "https://api.coze.cn/v1/workflow/run" headers = { "Content-Type": "application/json", "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB" } payload = { "workflow_id": "7507245138873450535", "parameters": { "summary": summary, "timeline": timeline } } response = requests.post(url, json=payload, headers=headers, timeout=600) response.raise_for_status() return response.json() def extract_fields_from_response(resp): import re # Define patterns at the function level JSON_PATTERNS = [ r"```json\\n(.*?)```", # 转义的换行 r"```json\n(.*?)```", # 普通换行 r"```(.*?)```", # 无语言标记 r"\{.*\}" # 直接JSON对象 ] def try_unescape_json_string(s): # 递归反序列化所有层级的转义JSON字符串 for _ in range(3): # 最多尝试3层 if isinstance(s, str): try: s2 = json.loads(s) # 如果反序列化后类型有变化,继续递归 if type(s2) != str: s = s2 else: break except Exception as e: print(f"JSON反序列化失败: {str(e)}") break else: break return s def extract_json_from_string(s): """Helper function to extract and parse JSON from a string""" if not isinstance(s, str): return s # First try direct JSON parsing try: return json.loads(s) except json.JSONDecodeError: pass # Then try each pattern for pattern in JSON_PATTERNS: json_str = re.search(pattern, s, re.DOTALL) if json_str: try: content = json_str.group(1) return json.loads(content) except Exception as e: print(f"使用模式 {pattern} 解析失败: {str(e)}") continue return s try: data = resp.get("data") if not data: print("响应中没有data字段") return ("", "", "") # First parse the outer JSON structure try: data = json.loads(data) except json.JSONDecodeError as e: print(f"解析外层data失败: {str(e)}") return ("", "", "") # Then handle the output field output = data.get("output") if not output: print("data中没有output字段") return ("", "", "") print(f"\n原始output字段: {output}") output = extract_json_from_string(output) if isinstance(output, str): print("output解析后仍为字符串") return ("", "", "") if isinstance(output, dict): # 按优先级检查不同的字段名 if "需求列表" in output: demand_list = output["需求列表"] elif "questions" in output: demand_list = output["questions"] elif "interactive_questions" in output: demand_list = output["interactive_questions"] else: print("output中没有找到需求列表、questions或interactive_questions字段") return ("", "", "") else: demand_list = output if not demand_list or not isinstance(demand_list, list): print(f"需求列表无效: {demand_list}") return ("", "", "") times = [] queries = [] hooks = [] for item in demand_list: if not isinstance(item, dict): print(f"跳过非字典项: {item}") continue time = item.get("需求钩子出现时间", "") query = item.get("需求详细query", "") hook = item.get("需求钩子话术", "") if not all([time, query, hook]): print(f"跳过不完整项: {item}") continue # Handle time format if time == "end": time = "视频结束" elif "-" in time: time = time.split("-")[0] # Take the start time times.append(time) queries.append(query) hooks.append(hook) if not times: print("没有提取到有效的需求项") return ("", "", "") return ("\n".join(times), "\n".join(queries), "\n".join(hooks)) except Exception as e: print(f"解析返回数据出错: {str(e)}") print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}") return ("", "", "") def process_excel(input_excel, output_excel=None): df = pd.read_excel(input_excel) for idx, row in df.iterrows(): summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}" timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}" try: print(f"开始处理第{idx+1}行") resp = call_coze_api(summary, timeline) time, query, hook = extract_fields_from_response(resp) df.iat[idx, 9] = time # 第8列 df.iat[idx, 10] = query # 第9列 df.iat[idx, 11] = hook # 第10列 print(f"第{idx+1}行处理完成") print(hook) except Exception as e: print(f"第{idx+1}行处理失败: {e}") df.iat[idx, 9] = "error" df.iat[idx, 10] = "error" df.iat[idx, 11] = "error" # 每处理一行就保存一次,防止中断丢失 df.to_excel(input_excel, index=False) print(f"已保存到第{idx+1}行") df.to_excel(input_excel, index=False) print("全部处理完成,已保存。") # 读取视频分析报告1.xlsx,找出第10列为空的行,重新调用coze工作流分析 def process_empty_rows(input_excel, output_excel=None): df = pd.read_excel(input_excel) for idx, row in df.iterrows(): if pd.notna(row.iloc[10]): continue summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}" timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}" try: print(f"开始处理第{idx+1}行") resp = call_coze_api(summary, timeline) time, query, hook = extract_fields_from_response(resp) df.iat[idx, 9] = time # 第8列 df.iat[idx, 10] = query # 第9列 df.iat[idx, 11] = hook # 第10列 print(f"第{idx+1}行处理完成") print(hook) except Exception as e: print(f"第{idx+1}行处理失败: {e}") df.iat[idx, 9] = "error" df.iat[idx, 10] = "error" df.iat[idx, 11] = "error" # 每处理一行就保存一次,防止中断丢失 df.to_excel(input_excel, index=False) print(f"已保存到第{idx+1}行") df.to_excel(input_excel, index=False) print("全部处理完成,已保存。") if __name__ == "__main__": # process_empty_rows("视频分析报告1_拆分钩子.xlsx") # 直接处理原文件 process_excel("视频分析报告1_拆分钩子.xlsx")