# 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表 import pandas as pd import orjson import os from 视频理解任务 import GoogleVideoAnalyzer from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2 import requests import json from datetime import datetime import re def get_first_row_and_call_coze(): import json import requests from datetime import datetime try: # 读取Excel文件 print(f"[{datetime.now()}] 开始读取Excel文件...") df = pd.read_excel('视频分析报告1.xlsx') # 获取第一行数据 first_row = df.iloc[0] # 获取第6、7、8列的数据(对应F、G、H列) summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}" timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}" video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else "" # 准备数据 - 确保数据格式正确 data = { "summary": summary, "timeline": timeline } # 调用Coze工作流 url = "https://api.coze.cn/v1/workflow/run" headers = { "Content-Type": "application/json", "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB" } payload = { "workflow_id": "7507245138873450535", "parameters": data } print(f"[{datetime.now()}] 开始调用Coze工作流...") print(f"请求URL: {url}") print(f"请求头: {headers}") print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}") response = requests.post(url, json=payload, headers=headers, timeout=60) # 打印响应状态和头信息 print(f"\n响应状态码: {response.status_code}") print(f"响应头: {dict(response.headers)}") # 尝试获取响应内容 try: response_text = response.text print(f"响应内容: {response_text}") # 检查响应内容是否为空 if not response_text.strip(): print("警告: 响应内容为空") return None # 尝试解析JSON try: result = response.json() print(f"\n[{datetime.now()}] Coze工作流返回结果:") print(json.dumps(result, ensure_ascii=False, indent=2)) return result except json.JSONDecodeError as je: print(f"JSON解析错误: {str(je)}") print(f"原始响应内容: {response_text}") return None except Exception as e: print(f"读取响应内容时出错: {str(e)}") return None except pd.errors.EmptyDataError: print("错误: Excel文件为空") except requests.exceptions.RequestException as e: print(f"请求错误: {str(e)}") if hasattr(e, 'response') and e.response is not None: print(f"错误响应状态码: {e.response.status_code}") print(f"错误响应内容: {e.response.text}") except Exception as e: print(f"发生未知错误: {str(e)}") return None def call_coze_api(summary, timeline): url = "https://api.coze.cn/v1/workflow/run" headers = { "Content-Type": "application/json", "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB" } payload = { "workflow_id": "7511970477477904393", "parameters": { "summary": summary, "timeline": timeline } } response = requests.post(url, json=payload, headers=headers, timeout=600) response.raise_for_status() return response.json() def extract_fields_from_response(resp): """ 从响应中提取字段数据 Args: resp: API响应字典,可能包含多层嵌套的JSON字符串 Returns: tuple: (time_str, query_str, hook_str) 三个字符串,分别包含时间、问题和钩子话术 """ import json import re import logging logger = logging.getLogger(__name__) def extract_json_from_markdown(markdown_str): """从markdown代码块中提取JSON字符串""" # 匹配 ```json 和 ``` 之间的内容 match = re.search(r'```(?:json)?\s*([\s\S]*?)```', markdown_str) if match: return match.group(1).strip() return markdown_str.strip() def parse_nested_json(json_str): """递归解析嵌套的JSON字符串""" try: # 尝试直接解析 return json.loads(json_str) except json.JSONDecodeError: try: # 如果失败,尝试解析转义的JSON字符串 unescaped = json_str.encode().decode('unicode_escape') return json.loads(unescaped) except: # 如果还是失败,返回原始字符串 return json_str try: # 处理新的API响应格式 if isinstance(resp, dict) and 'data' in resp: # 如果data是字符串,尝试解析它 if isinstance(resp['data'], str): try: data = json.loads(resp['data']) if isinstance(data, dict) and 'output' in data: current_data = data['output'] else: current_data = data except json.JSONDecodeError: current_data = resp['data'] else: current_data = resp['data'] else: current_data = resp # 处理多层嵌套的JSON while isinstance(current_data, (str, dict)): if isinstance(current_data, dict): # 如果是字典,尝试获取data或output字段 if 'data' in current_data: current_data = current_data['data'] elif 'output' in current_data: current_data = current_data['output'] else: break elif isinstance(current_data, str): # 如果是字符串,尝试解析JSON try: parsed = parse_nested_json(current_data) if parsed == current_data: # 如果解析结果和输入相同,说明不是JSON break current_data = parsed except: break # 如果最终结果是字符串,尝试从markdown中提取 if isinstance(current_data, str): current_data = extract_json_from_markdown(current_data) current_data = parse_nested_json(current_data) logger.info(f"Final parsed data type: {type(current_data)}") logger.info(f"Final data preview: {str(current_data)[:200]}...") # 确保数据是列表 if not isinstance(current_data, list): logger.warning(f"Converting non-list data to list: {type(current_data)}") current_data = [current_data] # 提取并验证每个对象 time_list = [] query_list = [] hook_list = [] for item in current_data: if not isinstance(item, dict): logger.warning(f"Skipping invalid item: {item}") continue try: # 使用新的中文字段名获取值 time = str(item.get('钩子出现时间', '')).strip() query = str(item.get('钩子到AI大模型的问题', '')).strip() hook = str(item.get('钩子话术', '')).strip() if time or query or hook: # 只添加非空值 time_list.append(time) query_list.append(query) hook_list.append(hook) except Exception as e: logger.warning(f"Error extracting fields from item: {e}") continue # 将列表转换为字符串,用换行符连接 time_str = '\n'.join(time_list) if time_list else '' query_str = '\n'.join(query_list) if query_list else '' hook_str = '\n'.join(hook_list) if hook_list else '' logger.info(f"Extracted - Time: {time_str[:50]}..., Query: {query_str[:50]}..., Hook: {hook_str[:50]}...") return time_str, query_str, hook_str except json.JSONDecodeError as je: logger.error(f"JSON decode error: {str(je)}") logger.error(f"Error position: {je.pos}") logger.error(f"Character at error: {repr(str(current_data)[je.pos:je.pos+10])}") return '', '', '' except Exception as e: logger.error(f"Error processing response: {str(e)}") logger.error(f"Response data: {str(resp)[:200]}...") return '', '', '' def process_excel(input_excel, output_excel=None): """ 处理Excel文件,调用API并更新数据 """ try: df = pd.read_excel(input_excel) # 确保目标列存在且为字符串类型 for col in [8, 9, 10]: # 对应第9、10、11列 if col >= len(df.columns): df.insert(col, f'Column_{col}', '') df.iloc[:, col] = df.iloc[:, col].fillna('').astype(str) for idx, row in df.iterrows(): try: print(f"\n开始处理第{idx+1}行") summary = str(row.iloc[5]) if pd.notna(row.iloc[5]) else "{}" timeline = str(row.iloc[6]) if pd.notna(row.iloc[6]) else "{}" print(f"Summary: {summary[:100]}...") # 只打印前100个字符 print(f"Timeline: {timeline[:100]}...") resp = call_coze_api(summary, timeline) print(f"API Response: {json.dumps(resp, ensure_ascii=False)}") # 打印API响应 time, query, hook = extract_fields_from_response(resp) # 更新数据 - 使用单个值赋值而不是批量赋值 df.at[idx, df.columns[7]] = time df.at[idx, df.columns[8]] = hook df.at[idx, df.columns[9]] = query print(f"第{idx+1}行处理完成") print(f"Time: {time[:100]}...") print(f"Hook: {hook[:100]}...") print(f"Query: {query[:100]}...") except Exception as e: print(f"第{idx+1}行处理失败: {str(e)}") df.at[idx, df.columns[7]] = "error" df.at[idx, df.columns[8]] = "error" df.at[idx, df.columns[9]] = "error" # 每处理一行就保存一次,防止中断丢失 df.to_excel(input_excel, index=False) print(f"已保存到第{idx+1}行") print("全部处理完成,已保存。") except Exception as e: print(f"处理Excel文件时发生错误: {str(e)}") raise if __name__ == "__main__": process_excel("文件/视频分析报告_new.xlsx")