获取trigger任务.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表
  2. import pandas as pd
  3. import orjson
  4. import os
  5. from 视频理解任务 import GoogleVideoAnalyzer
  6. from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2
  7. import requests
  8. import json
  9. from datetime import datetime
  10. import re
  11. def get_first_row_and_call_coze():
  12. import json
  13. import requests
  14. from datetime import datetime
  15. try:
  16. # 读取Excel文件
  17. print(f"[{datetime.now()}] 开始读取Excel文件...")
  18. df = pd.read_excel('视频分析报告1.xlsx')
  19. # 获取第一行数据
  20. first_row = df.iloc[0]
  21. # 获取第6、7、8列的数据(对应F、G、H列)
  22. summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}"
  23. timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}"
  24. video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else ""
  25. # 准备数据 - 确保数据格式正确
  26. data = {
  27. "summary": summary,
  28. "timeline": timeline
  29. }
  30. # 调用Coze工作流
  31. url = "https://api.coze.cn/v1/workflow/run"
  32. headers = {
  33. "Content-Type": "application/json",
  34. "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
  35. }
  36. payload = {
  37. "workflow_id": "7507245138873450535",
  38. "parameters": data
  39. }
  40. print(f"[{datetime.now()}] 开始调用Coze工作流...")
  41. print(f"请求URL: {url}")
  42. print(f"请求头: {headers}")
  43. print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}")
  44. response = requests.post(url, json=payload, headers=headers, timeout=60)
  45. # 打印响应状态和头信息
  46. print(f"\n响应状态码: {response.status_code}")
  47. print(f"响应头: {dict(response.headers)}")
  48. # 尝试获取响应内容
  49. try:
  50. response_text = response.text
  51. print(f"响应内容: {response_text}")
  52. # 检查响应内容是否为空
  53. if not response_text.strip():
  54. print("警告: 响应内容为空")
  55. return None
  56. # 尝试解析JSON
  57. try:
  58. result = response.json()
  59. print(f"\n[{datetime.now()}] Coze工作流返回结果:")
  60. print(json.dumps(result, ensure_ascii=False, indent=2))
  61. return result
  62. except json.JSONDecodeError as je:
  63. print(f"JSON解析错误: {str(je)}")
  64. print(f"原始响应内容: {response_text}")
  65. return None
  66. except Exception as e:
  67. print(f"读取响应内容时出错: {str(e)}")
  68. return None
  69. except pd.errors.EmptyDataError:
  70. print("错误: Excel文件为空")
  71. except requests.exceptions.RequestException as e:
  72. print(f"请求错误: {str(e)}")
  73. if hasattr(e, 'response') and e.response is not None:
  74. print(f"错误响应状态码: {e.response.status_code}")
  75. print(f"错误响应内容: {e.response.text}")
  76. except Exception as e:
  77. print(f"发生未知错误: {str(e)}")
  78. return None
  79. def call_coze_api(summary, timeline):
  80. url = "https://api.coze.cn/v1/workflow/run"
  81. headers = {
  82. "Content-Type": "application/json",
  83. "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
  84. }
  85. payload = {
  86. "workflow_id": "7507245138873450535",
  87. "parameters": {
  88. "summary": summary,
  89. "timeline": timeline
  90. }
  91. }
  92. response = requests.post(url, json=payload, headers=headers, timeout=600)
  93. response.raise_for_status()
  94. return response.json()
  95. def extract_fields_from_response(resp):
  96. import re
  97. # Define patterns at the function level
  98. JSON_PATTERNS = [
  99. r"```json\\n(.*?)```", # 转义的换行
  100. r"```json\n(.*?)```", # 普通换行
  101. r"```(.*?)```", # 无语言标记
  102. r"\{.*\}" # 直接JSON对象
  103. ]
  104. def try_unescape_json_string(s):
  105. # 递归反序列化所有层级的转义JSON字符串
  106. for _ in range(3): # 最多尝试3层
  107. if isinstance(s, str):
  108. try:
  109. s2 = json.loads(s)
  110. # 如果反序列化后类型有变化,继续递归
  111. if type(s2) != str:
  112. s = s2
  113. else:
  114. break
  115. except Exception as e:
  116. print(f"JSON反序列化失败: {str(e)}")
  117. break
  118. else:
  119. break
  120. return s
  121. def extract_json_from_string(s):
  122. """Helper function to extract and parse JSON from a string"""
  123. if not isinstance(s, str):
  124. return s
  125. # First try direct JSON parsing
  126. try:
  127. return json.loads(s)
  128. except json.JSONDecodeError:
  129. pass
  130. # Then try each pattern
  131. for pattern in JSON_PATTERNS:
  132. json_str = re.search(pattern, s, re.DOTALL)
  133. if json_str:
  134. try:
  135. content = json_str.group(1)
  136. return json.loads(content)
  137. except Exception as e:
  138. print(f"使用模式 {pattern} 解析失败: {str(e)}")
  139. continue
  140. return s
  141. try:
  142. data = resp.get("data")
  143. if not data:
  144. print("响应中没有data字段")
  145. return ("", "", "")
  146. # First parse the outer JSON structure
  147. try:
  148. data = json.loads(data)
  149. except json.JSONDecodeError as e:
  150. print(f"解析外层data失败: {str(e)}")
  151. return ("", "", "")
  152. # Then handle the output field
  153. output = data.get("output")
  154. if not output:
  155. print("data中没有output字段")
  156. return ("", "", "")
  157. print(f"\n原始output字段: {output}")
  158. output = extract_json_from_string(output)
  159. if isinstance(output, str):
  160. print("output解析后仍为字符串")
  161. return ("", "", "")
  162. if isinstance(output, dict):
  163. # 按优先级检查不同的字段名
  164. if "需求列表" in output:
  165. demand_list = output["需求列表"]
  166. elif "questions" in output:
  167. demand_list = output["questions"]
  168. elif "interactive_questions" in output:
  169. demand_list = output["interactive_questions"]
  170. else:
  171. print("output中没有找到需求列表、questions或interactive_questions字段")
  172. return ("", "", "")
  173. else:
  174. demand_list = output
  175. if not demand_list or not isinstance(demand_list, list):
  176. print(f"需求列表无效: {demand_list}")
  177. return ("", "", "")
  178. times = []
  179. queries = []
  180. hooks = []
  181. for item in demand_list:
  182. if not isinstance(item, dict):
  183. print(f"跳过非字典项: {item}")
  184. continue
  185. time = item.get("需求钩子出现时间", "")
  186. query = item.get("需求详细query", "")
  187. hook = item.get("需求钩子话术", "")
  188. if not all([time, query, hook]):
  189. print(f"跳过不完整项: {item}")
  190. continue
  191. # Handle time format
  192. if time == "end":
  193. time = "视频结束"
  194. elif "-" in time:
  195. time = time.split("-")[0] # Take the start time
  196. times.append(time)
  197. queries.append(query)
  198. hooks.append(hook)
  199. if not times:
  200. print("没有提取到有效的需求项")
  201. return ("", "", "")
  202. return ("\n".join(times), "\n".join(queries), "\n".join(hooks))
  203. except Exception as e:
  204. print(f"解析返回数据出错: {str(e)}")
  205. print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}")
  206. return ("", "", "")
  207. def process_excel(input_excel, output_excel=None):
  208. df = pd.read_excel(input_excel)
  209. for idx, row in df.iterrows():
  210. summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}"
  211. timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}"
  212. try:
  213. print(f"开始处理第{idx+1}行")
  214. resp = call_coze_api(summary, timeline)
  215. time, query, hook = extract_fields_from_response(resp)
  216. df.iat[idx, 9] = time # 第8列
  217. df.iat[idx, 10] = query # 第9列
  218. df.iat[idx, 11] = hook # 第10列
  219. print(f"第{idx+1}行处理完成")
  220. print(hook)
  221. except Exception as e:
  222. print(f"第{idx+1}行处理失败: {e}")
  223. df.iat[idx, 9] = "error"
  224. df.iat[idx, 10] = "error"
  225. df.iat[idx, 11] = "error"
  226. # 每处理一行就保存一次,防止中断丢失
  227. df.to_excel(input_excel, index=False)
  228. print(f"已保存到第{idx+1}行")
  229. df.to_excel(input_excel, index=False)
  230. print("全部处理完成,已保存。")
  231. # 读取视频分析报告1.xlsx,找出第10列为空的行,重新调用coze工作流分析
  232. def process_empty_rows(input_excel, output_excel=None):
  233. df = pd.read_excel(input_excel)
  234. for idx, row in df.iterrows():
  235. if pd.notna(row.iloc[10]):
  236. continue
  237. summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}"
  238. timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}"
  239. try:
  240. print(f"开始处理第{idx+1}行")
  241. resp = call_coze_api(summary, timeline)
  242. time, query, hook = extract_fields_from_response(resp)
  243. df.iat[idx, 9] = time # 第8列
  244. df.iat[idx, 10] = query # 第9列
  245. df.iat[idx, 11] = hook # 第10列
  246. print(f"第{idx+1}行处理完成")
  247. print(hook)
  248. except Exception as e:
  249. print(f"第{idx+1}行处理失败: {e}")
  250. df.iat[idx, 9] = "error"
  251. df.iat[idx, 10] = "error"
  252. df.iat[idx, 11] = "error"
  253. # 每处理一行就保存一次,防止中断丢失
  254. df.to_excel(input_excel, index=False)
  255. print(f"已保存到第{idx+1}行")
  256. df.to_excel(input_excel, index=False)
  257. print("全部处理完成,已保存。")
  258. if __name__ == "__main__":
  259. process_excel("文件/视频分析报告_new.xlsx")