# 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表 import pandas as pd import orjson import os from 视频理解任务 import GoogleVideoAnalyzer from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2 import requests import json from datetime import datetime import re def get_first_row_and_call_coze(): import json import requests from datetime import datetime try: # 读取Excel文件 print(f"[{datetime.now()}] 开始读取Excel文件...") df = pd.read_excel('视频分析报告1.xlsx') # 获取第一行数据 first_row = df.iloc[0] # 获取第6、7、8列的数据(对应F、G、H列) summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}" timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}" video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else "" # 准备数据 - 确保数据格式正确 data = { "summary": summary, "timeline": timeline } # 调用Coze工作流 url = "https://api.coze.cn/v1/workflow/run" headers = { "Content-Type": "application/json", "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB" } payload = { "workflow_id": "7507245138873450535", "parameters": data } print(f"[{datetime.now()}] 开始调用Coze工作流...") print(f"请求URL: {url}") print(f"请求头: {headers}") print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}") response = requests.post(url, json=payload, headers=headers, timeout=60) # 打印响应状态和头信息 print(f"\n响应状态码: {response.status_code}") print(f"响应头: {dict(response.headers)}") # 尝试获取响应内容 try: response_text = response.text print(f"响应内容: {response_text}") # 检查响应内容是否为空 if not response_text.strip(): print("警告: 响应内容为空") return None # 尝试解析JSON try: result = response.json() print(f"\n[{datetime.now()}] Coze工作流返回结果:") print(json.dumps(result, ensure_ascii=False, indent=2)) return result except json.JSONDecodeError as je: print(f"JSON解析错误: {str(je)}") print(f"原始响应内容: {response_text}") return None except Exception as e: print(f"读取响应内容时出错: {str(e)}") return None except pd.errors.EmptyDataError: print("错误: Excel文件为空") except requests.exceptions.RequestException as e: print(f"请求错误: {str(e)}") if hasattr(e, 'response') and e.response is not None: print(f"错误响应状态码: {e.response.status_code}") print(f"错误响应内容: {e.response.text}") except Exception as e: print(f"发生未知错误: {str(e)}") return None def call_coze_api(summary, timeline): url = "https://api.coze.cn/v1/workflow/run" headers = { "Content-Type": "application/json", "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB" } payload = { "workflow_id": "7506810742659579904", "parameters": { "summary": summary, "timeline": timeline } } response = requests.post(url, json=payload, headers=headers, timeout=600) response.raise_for_status() return response.json() def extract_fields_from_response(resp): """ 从响应中提取字段数据 Args: resp: API响应字典,可能包含多层嵌套的JSON字符串 Returns: tuple: (time_str, theme_str, trigger_str) 三个字符串,分别包含时间、主题和触发语 """ import json import re import logging logger = logging.getLogger(__name__) def extract_json_from_markdown(markdown_str): """从markdown代码块中提取JSON字符串""" # 匹配 ```json 和 ``` 之间的内容 match = re.search(r'```(?:json)?\s*([\s\S]*?)```', markdown_str) if match: return match.group(1).strip() return markdown_str.strip() def parse_nested_json(json_str): """递归解析嵌套的JSON字符串""" try: # 尝试直接解析 return json.loads(json_str) except json.JSONDecodeError: try: # 如果失败,尝试解析转义的JSON字符串 unescaped = json_str.encode().decode('unicode_escape') return json.loads(unescaped) except: # 如果还是失败,返回原始字符串 return json_str try: # 处理多层嵌套的JSON current_data = resp while isinstance(current_data, (str, dict)): if isinstance(current_data, dict): # 如果是字典,尝试获取data或output字段 if 'data' in current_data: current_data = current_data['data'] elif 'output' in current_data: current_data = current_data['output'] else: break elif isinstance(current_data, str): # 如果是字符串,尝试解析JSON try: parsed = parse_nested_json(current_data) if parsed == current_data: # 如果解析结果和输入相同,说明不是JSON break current_data = parsed except: break # 如果最终结果是字符串,尝试从markdown中提取 if isinstance(current_data, str): current_data = extract_json_from_markdown(current_data) current_data = parse_nested_json(current_data) logger.info(f"Final parsed data type: {type(current_data)}") logger.info(f"Final data preview: {str(current_data)[:200]}...") # 确保数据是列表 if not isinstance(current_data, list): logger.warning(f"Converting non-list data to list: {type(current_data)}") current_data = [current_data] # 提取并验证每个对象 time_list = [] theme_list = [] trigger_list = [] for item in current_data: if not isinstance(item, dict): logger.warning(f"Skipping invalid item: {item}") continue try: # 使用get方法安全地获取值,并提供默认值 time = str(item.get('time', '')).strip() theme = str(item.get('theme', '')).strip() trigger = str(item.get('trigger', '')).strip() if time or theme or trigger: # 只添加非空值 time_list.append(time) theme_list.append(theme) trigger_list.append(trigger) except Exception as e: logger.warning(f"Error extracting fields from item: {e}") continue # 将列表转换为字符串,用换行符连接 time_str = '\n'.join(time_list) if time_list else '' theme_str = '\n'.join(theme_list) if theme_list else '' trigger_str = '\n'.join(trigger_list) if trigger_list else '' logger.info(f"Extracted - Time: {time_str[:50]}..., Theme: {theme_str[:50]}..., Trigger: {trigger_str[:50]}...") return time_str, theme_str, trigger_str except json.JSONDecodeError as je: logger.error(f"JSON decode error: {str(je)}") logger.error(f"Error position: {je.pos}") logger.error(f"Character at error: {repr(str(current_data)[je.pos:je.pos+10])}") return '', '', '' except Exception as e: logger.error(f"Error processing response: {str(e)}") logger.error(f"Response data: {str(resp)[:200]}...") return '', '', '' def process_excel(input_excel, output_excel=None): """ 处理Excel文件,调用API并更新数据 """ try: df = pd.read_excel(input_excel) # 确保目标列存在且为字符串类型 for col in [8, 9, 10]: # 对应第9、10、11列 if col >= len(df.columns): df.insert(col, f'Column_{col}', '') df.iloc[:, col] = df.iloc[:, col].fillna('').astype(str) for idx, row in df.iterrows(): try: print(f"\n开始处理第{idx+1}行") summary = str(row.iloc[5]) if pd.notna(row.iloc[5]) else "{}" timeline = str(row.iloc[6]) if pd.notna(row.iloc[6]) else "{}" print(f"Summary: {summary[:100]}...") # 只打印前100个字符 print(f"Timeline: {timeline[:100]}...") resp = call_coze_api(summary, timeline) print(f"API Response: {json.dumps(resp, ensure_ascii=False)[:200]}...") # 打印API响应 time, theme, hook = extract_fields_from_response(resp) # 更新数据 - 使用单个值赋值而不是批量赋值 df.at[idx, df.columns[7]] = time df.at[idx, df.columns[8]] = hook print(f"第{idx+1}行处理完成") print(f"Time: {time[:100]}...") print(f"Hook: {hook[:100]}...") except Exception as e: print(f"第{idx+1}行处理失败: {str(e)}") df.at[idx, df.columns[7]] = "error" df.at[idx, df.columns[8]] = "error" # 每处理一行就保存一次,防止中断丢失 df.to_excel(input_excel, index=False) print(f"已保存到第{idx+1}行") print("全部处理完成,已保存。") except Exception as e: print(f"处理Excel文件时发生错误: {str(e)}") raise # 读取视频分析报告1.xlsx,找出第10列为空的行,重新调用coze工作流分析 def process_empty_rows(input_excel, output_excel=None): df = pd.read_excel(input_excel) for idx, row in df.iterrows(): if pd.notna(row.iloc[10]): continue summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}" timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}" try: print(f"开始处理第{idx+1}行") resp = call_coze_api(summary, timeline) time, query, hook = extract_fields_from_response(resp) df.iat[idx, 9] = time # 第8列 df.iat[idx, 10] = query # 第9列 df.iat[idx, 11] = hook # 第10列 print(f"第{idx+1}行处理完成") print(hook) except Exception as e: print(f"第{idx+1}行处理失败: {e}") df.iat[idx, 9] = "error" df.iat[idx, 10] = "error" df.iat[idx, 11] = "error" # 每处理一行就保存一次,防止中断丢失 df.to_excel(input_excel, index=False) print(f"已保存到第{idx+1}行") df.to_excel(input_excel, index=False) print("全部处理完成,已保存。") if __name__ == "__main__": process_excel("文件/视频分析报告_new.xlsx")