import requests import json class CozeHook(object): def __init__(self): self.url = "https://api.coze.cn/v1/workflow/run" self.headers = { "Content-Type": "application/json", "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB" } self.hook_id = "7513885396187742244" def call_coze_api(self, summary, timeline): url = self.url headers = self.headers payload = { "workflow_id": self.hook_id, "parameters": { "summary": summary, "timeline": timeline } } response = requests.post(url, json=payload, headers=headers, timeout=600) response.raise_for_status() return response.json() def extract_fields_from_response(self, resp): import re # Define patterns at the function level JSON_PATTERNS = [ r"```json\\n(.*?)```", # 转义的换行 r"```json\n(.*?)```", # 普通换行 r"```(.*?)```", # 无语言标记 r"\{.*\}" # 直接JSON对象 ] def try_unescape_json_string(s): # 递归反序列化所有层级的转义JSON字符串 for _ in range(3): # 最多尝试3层 if isinstance(s, str): try: s2 = json.loads(s) # 如果反序列化后类型有变化,继续递归 if type(s2) != str: s = s2 else: break except Exception as e: print(f"JSON反序列化失败: {str(e)}") break else: break return s def extract_json_from_string(s): """Helper function to extract and parse JSON from a string""" if not isinstance(s, str): return s # First try direct JSON parsing try: return json.loads(s) except json.JSONDecodeError: pass # Then try each pattern for pattern in JSON_PATTERNS: json_str = re.search(pattern, s, re.DOTALL) if json_str: try: content = json_str.group(1) return json.loads(content) except Exception as e: print(f"使用模式 {pattern} 解析失败: {str(e)}") continue return s try: data = resp.get("data") if not data: print("响应中没有data字段") return ("", "", "") # First parse the outer JSON structure try: data = json.loads(data) except json.JSONDecodeError as e: print(f"解析外层data失败: {str(e)}") return ("", "", "") # Then handle the output field output = data.get("output") if not output: print("data中没有output字段") return ("", "", "") print(f"\n原始output字段: {output}") output = extract_json_from_string(output) if isinstance(output, str): print("output解析后仍为字符串") return ("", "", "") if isinstance(output, dict): # 按优先级检查不同的字段名 if "需求列表" in output: demand_list = output["需求列表"] elif "questions" in output: demand_list = output["questions"] elif "interactive_questions" in output: demand_list = output["interactive_questions"] else: print("output中没有找到需求列表、questions或interactive_questions字段") return [] else: demand_list = output if not demand_list or not isinstance(demand_list, list): print(f"需求列表无效: {demand_list}") return demand_list except Exception as e: print(f"解析返回数据出错: {str(e)}") print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}") return [] def run(self, summary, timeline): resp = self.call_coze_api(summary, timeline) list = self.extract_fields_from_response(resp) return list