123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- # 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表
- import pandas as pd
- import orjson
- import os
- from 视频理解任务 import GoogleVideoAnalyzer
- from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2
- import requests
- import json
- from datetime import datetime
- import re
- def get_first_row_and_call_coze():
- import json
- import requests
- from datetime import datetime
-
- try:
- # 读取Excel文件
- print(f"[{datetime.now()}] 开始读取Excel文件...")
- df = pd.read_excel('视频分析报告1.xlsx')
-
- # 获取第一行数据
- first_row = df.iloc[0]
-
- # 获取第6、7、8列的数据(对应F、G、H列)
- summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}"
- timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}"
- video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else ""
-
- # 准备数据 - 确保数据格式正确
- data = {
- "summary": summary,
- "timeline": timeline
- }
-
- # 调用Coze工作流
- url = "https://api.coze.cn/v1/workflow/run"
- headers = {
- "Content-Type": "application/json",
- "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
- }
-
- payload = {
- "workflow_id": "7507245138873450535",
- "parameters": data
- }
-
- print(f"[{datetime.now()}] 开始调用Coze工作流...")
- print(f"请求URL: {url}")
- print(f"请求头: {headers}")
- print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}")
-
- response = requests.post(url, json=payload, headers=headers, timeout=60)
-
- # 打印响应状态和头信息
- print(f"\n响应状态码: {response.status_code}")
- print(f"响应头: {dict(response.headers)}")
-
- # 尝试获取响应内容
- try:
- response_text = response.text
- print(f"响应内容: {response_text}")
-
- # 检查响应内容是否为空
- if not response_text.strip():
- print("警告: 响应内容为空")
- return None
-
- # 尝试解析JSON
- try:
- result = response.json()
- print(f"\n[{datetime.now()}] Coze工作流返回结果:")
- print(json.dumps(result, ensure_ascii=False, indent=2))
- return result
- except json.JSONDecodeError as je:
- print(f"JSON解析错误: {str(je)}")
- print(f"原始响应内容: {response_text}")
- return None
-
- except Exception as e:
- print(f"读取响应内容时出错: {str(e)}")
- return None
-
- except pd.errors.EmptyDataError:
- print("错误: Excel文件为空")
- except requests.exceptions.RequestException as e:
- print(f"请求错误: {str(e)}")
- if hasattr(e, 'response') and e.response is not None:
- print(f"错误响应状态码: {e.response.status_code}")
- print(f"错误响应内容: {e.response.text}")
- except Exception as e:
- print(f"发生未知错误: {str(e)}")
- return None
- def call_coze_api(summary, timeline):
- url = "https://api.coze.cn/v1/workflow/run"
- headers = {
- "Content-Type": "application/json",
- "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
- }
- payload = {
- "workflow_id": "7511970477477904393",
- "parameters": {
- "summary": summary,
- "timeline": timeline
- }
- }
- response = requests.post(url, json=payload, headers=headers, timeout=600)
- response.raise_for_status()
- return response.json()
- def extract_fields_from_response(resp):
- """
- 从响应中提取字段数据
- Args:
- resp: API响应字典,可能包含多层嵌套的JSON字符串
- Returns:
- tuple: (time_str, query_str, hook_str) 三个字符串,分别包含时间、问题和钩子话术
- """
- import json
- import re
- import logging
-
- logger = logging.getLogger(__name__)
-
- def extract_json_from_markdown(markdown_str):
- """从markdown代码块中提取JSON字符串"""
- # 匹配 ```json 和 ``` 之间的内容
- match = re.search(r'```(?:json)?\s*([\s\S]*?)```', markdown_str)
- if match:
- return match.group(1).strip()
- return markdown_str.strip()
-
- def parse_nested_json(json_str):
- """递归解析嵌套的JSON字符串"""
- try:
- # 尝试直接解析
- return json.loads(json_str)
- except json.JSONDecodeError:
- try:
- # 如果失败,尝试解析转义的JSON字符串
- unescaped = json_str.encode().decode('unicode_escape')
- return json.loads(unescaped)
- except:
- # 如果还是失败,返回原始字符串
- return json_str
-
- try:
- # 处理新的API响应格式
- if isinstance(resp, dict) and 'data' in resp:
- # 如果data是字符串,尝试解析它
- if isinstance(resp['data'], str):
- try:
- data = json.loads(resp['data'])
- if isinstance(data, dict) and 'output' in data:
- current_data = data['output']
- else:
- current_data = data
- except json.JSONDecodeError:
- current_data = resp['data']
- else:
- current_data = resp['data']
- else:
- current_data = resp
- # 处理多层嵌套的JSON
- while isinstance(current_data, (str, dict)):
- if isinstance(current_data, dict):
- # 如果是字典,尝试获取data或output字段
- if 'data' in current_data:
- current_data = current_data['data']
- elif 'output' in current_data:
- current_data = current_data['output']
- else:
- break
- elif isinstance(current_data, str):
- # 如果是字符串,尝试解析JSON
- try:
- parsed = parse_nested_json(current_data)
- if parsed == current_data: # 如果解析结果和输入相同,说明不是JSON
- break
- current_data = parsed
- except:
- break
-
- # 如果最终结果是字符串,尝试从markdown中提取
- if isinstance(current_data, str):
- current_data = extract_json_from_markdown(current_data)
- current_data = parse_nested_json(current_data)
-
- logger.info(f"Final parsed data type: {type(current_data)}")
- logger.info(f"Final data preview: {str(current_data)[:200]}...")
-
- # 确保数据是列表
- if not isinstance(current_data, list):
- logger.warning(f"Converting non-list data to list: {type(current_data)}")
- current_data = [current_data]
-
- # 提取并验证每个对象
- time_list = []
- query_list = []
- hook_list = []
-
- for item in current_data:
- if not isinstance(item, dict):
- logger.warning(f"Skipping invalid item: {item}")
- continue
-
- try:
- # 使用新的中文字段名获取值
- time = str(item.get('钩子出现时间', '')).strip()
- query = str(item.get('钩子到AI大模型的问题', '')).strip()
- hook = str(item.get('钩子话术', '')).strip()
-
- if time or query or hook: # 只添加非空值
- time_list.append(time)
- query_list.append(query)
- hook_list.append(hook)
- except Exception as e:
- logger.warning(f"Error extracting fields from item: {e}")
- continue
-
- # 将列表转换为字符串,用换行符连接
- time_str = '\n'.join(time_list) if time_list else ''
- query_str = '\n'.join(query_list) if query_list else ''
- hook_str = '\n'.join(hook_list) if hook_list else ''
-
- logger.info(f"Extracted - Time: {time_str[:50]}..., Query: {query_str[:50]}..., Hook: {hook_str[:50]}...")
- return time_str, query_str, hook_str
-
- except json.JSONDecodeError as je:
- logger.error(f"JSON decode error: {str(je)}")
- logger.error(f"Error position: {je.pos}")
- logger.error(f"Character at error: {repr(str(current_data)[je.pos:je.pos+10])}")
- return '', '', ''
- except Exception as e:
- logger.error(f"Error processing response: {str(e)}")
- logger.error(f"Response data: {str(resp)[:200]}...")
- return '', '', ''
- def process_excel(input_excel, output_excel=None):
- """
- 处理Excel文件,调用API并更新数据
- """
- try:
- df = pd.read_excel(input_excel)
-
- # 确保目标列存在且为字符串类型
- for col in [8, 9, 10]: # 对应第9、10、11列
- if col >= len(df.columns):
- df.insert(col, f'Column_{col}', '')
- df.iloc[:, col] = df.iloc[:, col].fillna('').astype(str)
-
- for idx, row in df.iterrows():
- try:
- print(f"\n开始处理第{idx+1}行")
- summary = str(row.iloc[5]) if pd.notna(row.iloc[5]) else "{}"
- timeline = str(row.iloc[6]) if pd.notna(row.iloc[6]) else "{}"
-
- print(f"Summary: {summary[:100]}...") # 只打印前100个字符
- print(f"Timeline: {timeline[:100]}...")
-
- resp = call_coze_api(summary, timeline)
- print(f"API Response: {json.dumps(resp, ensure_ascii=False)}") # 打印API响应
-
- time, query, hook = extract_fields_from_response(resp)
-
- # 更新数据 - 使用单个值赋值而不是批量赋值
- df.at[idx, df.columns[7]] = time
- df.at[idx, df.columns[8]] = hook
- df.at[idx, df.columns[9]] = query
-
- print(f"第{idx+1}行处理完成")
- print(f"Time: {time[:100]}...")
- print(f"Hook: {hook[:100]}...")
- print(f"Query: {query[:100]}...")
- except Exception as e:
- print(f"第{idx+1}行处理失败: {str(e)}")
- df.at[idx, df.columns[7]] = "error"
- df.at[idx, df.columns[8]] = "error"
- df.at[idx, df.columns[9]] = "error"
- # 每处理一行就保存一次,防止中断丢失
- df.to_excel(input_excel, index=False)
- print(f"已保存到第{idx+1}行")
-
- print("全部处理完成,已保存。")
-
- except Exception as e:
- print(f"处理Excel文件时发生错误: {str(e)}")
- raise
- if __name__ == "__main__":
- process_excel("文件/视频分析报告_new.xlsx")
|