123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- # 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表
- import pandas as pd
- import orjson
- import os
- from analyze_prompt import GoogleVideoAnalyzer
- from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2
- import requests
- import json
- from datetime import datetime
- import re
- def get_first_row_and_call_coze():
- import json
- import requests
- from datetime import datetime
-
- try:
- # 读取Excel文件
- print(f"[{datetime.now()}] 开始读取Excel文件...")
- df = pd.read_excel('视频分析报告1.xlsx')
-
- # 获取第一行数据
- first_row = df.iloc[0]
-
- # 获取第6、7、8列的数据(对应F、G、H列)
- summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}"
- timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}"
- video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else ""
-
- # 准备数据 - 确保数据格式正确
- data = {
- "summary": summary,
- "timeline": timeline
- }
-
- # 调用Coze工作流
- url = "https://api.coze.cn/v1/workflow/run"
- headers = {
- "Content-Type": "application/json",
- "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
- }
-
- payload = {
- "workflow_id": "7507245138873450535",
- "parameters": data
- }
-
- print(f"[{datetime.now()}] 开始调用Coze工作流...")
- print(f"请求URL: {url}")
- print(f"请求头: {headers}")
- print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}")
-
- response = requests.post(url, json=payload, headers=headers, timeout=60)
-
- # 打印响应状态和头信息
- print(f"\n响应状态码: {response.status_code}")
- print(f"响应头: {dict(response.headers)}")
-
- # 尝试获取响应内容
- try:
- response_text = response.text
- print(f"响应内容: {response_text}")
-
- # 检查响应内容是否为空
- if not response_text.strip():
- print("警告: 响应内容为空")
- return None
-
- # 尝试解析JSON
- try:
- result = response.json()
- print(f"\n[{datetime.now()}] Coze工作流返回结果:")
- print(json.dumps(result, ensure_ascii=False, indent=2))
- return result
- except json.JSONDecodeError as je:
- print(f"JSON解析错误: {str(je)}")
- print(f"原始响应内容: {response_text}")
- return None
-
- except Exception as e:
- print(f"读取响应内容时出错: {str(e)}")
- return None
-
- except pd.errors.EmptyDataError:
- print("错误: Excel文件为空")
- except requests.exceptions.RequestException as e:
- print(f"请求错误: {str(e)}")
- if hasattr(e, 'response') and e.response is not None:
- print(f"错误响应状态码: {e.response.status_code}")
- print(f"错误响应内容: {e.response.text}")
- except Exception as e:
- print(f"发生未知错误: {str(e)}")
- return None
- def call_coze_api(summary, timeline):
- url = "https://api.coze.cn/v1/workflow/run"
- headers = {
- "Content-Type": "application/json",
- "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
- }
- payload = {
- "workflow_id": "7507245138873450535",
- "parameters": {
- "summary": summary,
- "timeline": timeline
- }
- }
- response = requests.post(url, json=payload, headers=headers, timeout=600)
- response.raise_for_status()
- return response.json()
- def extract_fields_from_response(resp):
- import re
-
- # Define patterns at the function level
- JSON_PATTERNS = [
- r"```json\\n(.*?)```", # 转义的换行
- r"```json\n(.*?)```", # 普通换行
- r"```(.*?)```", # 无语言标记
- r"\{.*\}" # 直接JSON对象
- ]
-
- def try_unescape_json_string(s):
- # 递归反序列化所有层级的转义JSON字符串
- for _ in range(3): # 最多尝试3层
- if isinstance(s, str):
- try:
- s2 = json.loads(s)
- # 如果反序列化后类型有变化,继续递归
- if type(s2) != str:
- s = s2
- else:
- break
- except Exception as e:
- print(f"JSON反序列化失败: {str(e)}")
- break
- else:
- break
- return s
- def extract_json_from_string(s):
- """Helper function to extract and parse JSON from a string"""
- if not isinstance(s, str):
- return s
-
- # First try direct JSON parsing
- try:
- return json.loads(s)
- except json.JSONDecodeError:
- pass
-
- # Then try each pattern
- for pattern in JSON_PATTERNS:
- json_str = re.search(pattern, s, re.DOTALL)
- if json_str:
- try:
- content = json_str.group(1)
- return json.loads(content)
- except Exception as e:
- print(f"使用模式 {pattern} 解析失败: {str(e)}")
- continue
- return s
- try:
- data = resp.get("data")
- if not data:
- print("响应中没有data字段")
- return ("", "", "")
-
-
- # First parse the outer JSON structure
- try:
- data = json.loads(data)
- except json.JSONDecodeError as e:
- print(f"解析外层data失败: {str(e)}")
- return ("", "", "")
-
- # Then handle the output field
- output = data.get("output")
- if not output:
- print("data中没有output字段")
- return ("", "", "")
-
- print(f"\n原始output字段: {output}")
- output = extract_json_from_string(output)
-
- if isinstance(output, str):
- print("output解析后仍为字符串")
- return ("", "", "")
-
- if isinstance(output, dict):
- # 按优先级检查不同的字段名
- if "需求列表" in output:
- demand_list = output["需求列表"]
- elif "questions" in output:
- demand_list = output["questions"]
- elif "interactive_questions" in output:
- demand_list = output["interactive_questions"]
- else:
- print("output中没有找到需求列表、questions或interactive_questions字段")
- return ("", "", "")
- else:
- demand_list = output
- if not demand_list or not isinstance(demand_list, list):
- print(f"需求列表无效: {demand_list}")
- return ("", "", "")
- times = []
- queries = []
- hooks = []
-
- for item in demand_list:
- if not isinstance(item, dict):
- print(f"跳过非字典项: {item}")
- continue
-
- time = item.get("需求钩子出现时间", "")
- query = item.get("需求详细query", "")
- hook = item.get("需求钩子话术", "")
-
- if not all([time, query, hook]):
- print(f"跳过不完整项: {item}")
- continue
-
- # Handle time format
- if time == "end":
- time = "视频结束"
- elif "-" in time:
- time = time.split("-")[0] # Take the start time
-
- times.append(time)
- queries.append(query)
- hooks.append(hook)
- if not times:
- print("没有提取到有效的需求项")
- return ("", "", "")
- return ("\n".join(times), "\n".join(queries), "\n".join(hooks))
- except Exception as e:
- print(f"解析返回数据出错: {str(e)}")
- print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}")
- return ("", "", "")
- def process_excel(input_excel, output_excel=None):
- df = pd.read_excel(input_excel)
- for idx, row in df.iterrows():
- summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}"
- timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}"
- try:
- print(f"开始处理第{idx+1}行")
- resp = call_coze_api(summary, timeline)
- time, query, hook = extract_fields_from_response(resp)
- df.iat[idx, 9] = time # 第8列
- df.iat[idx, 10] = query # 第9列
- df.iat[idx, 11] = hook # 第10列
- print(f"第{idx+1}行处理完成")
- print(hook)
- except Exception as e:
- print(f"第{idx+1}行处理失败: {e}")
- df.iat[idx, 9] = "error"
- df.iat[idx, 10] = "error"
- df.iat[idx, 11] = "error"
- # 每处理一行就保存一次,防止中断丢失
- df.to_excel(input_excel, index=False)
- print(f"已保存到第{idx+1}行")
- df.to_excel(input_excel, index=False)
- print("全部处理完成,已保存。")
- # 读取视频分析报告1.xlsx,找出第10列为空的行,重新调用coze工作流分析
- def process_empty_rows(input_excel, output_excel=None):
- df = pd.read_excel(input_excel)
- for idx, row in df.iterrows():
- if pd.notna(row.iloc[10]):
- continue
- summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}"
- timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}"
- try:
- print(f"开始处理第{idx+1}行")
- resp = call_coze_api(summary, timeline)
- time, query, hook = extract_fields_from_response(resp)
- df.iat[idx, 9] = time # 第8列
- df.iat[idx, 10] = query # 第9列
- df.iat[idx, 11] = hook # 第10列
- print(f"第{idx+1}行处理完成")
- print(hook)
- except Exception as e:
- print(f"第{idx+1}行处理失败: {e}")
- df.iat[idx, 9] = "error"
- df.iat[idx, 10] = "error"
- df.iat[idx, 11] = "error"
- # 每处理一行就保存一次,防止中断丢失
- df.to_excel(input_excel, index=False)
- print(f"已保存到第{idx+1}行")
- df.to_excel(input_excel, index=False)
- print("全部处理完成,已保存。")
- if __name__ == "__main__":
- # process_empty_rows("视频分析报告1_拆分钩子.xlsx") # 直接处理原文件
- process_excel("视频分析报告1_拆分钩子.xlsx")
|