获取trigger任务.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表
  2. import pandas as pd
  3. import orjson
  4. import os
  5. from 视频理解任务 import GoogleVideoAnalyzer
  6. from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2
  7. import requests
  8. import json
  9. from datetime import datetime
  10. import re
  11. def get_first_row_and_call_coze():
  12. import json
  13. import requests
  14. from datetime import datetime
  15. try:
  16. # 读取Excel文件
  17. print(f"[{datetime.now()}] 开始读取Excel文件...")
  18. df = pd.read_excel('视频分析报告1.xlsx')
  19. # 获取第一行数据
  20. first_row = df.iloc[0]
  21. # 获取第6、7、8列的数据(对应F、G、H列)
  22. summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}"
  23. timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}"
  24. video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else ""
  25. # 准备数据 - 确保数据格式正确
  26. data = {
  27. "summary": summary,
  28. "timeline": timeline
  29. }
  30. # 调用Coze工作流
  31. url = "https://api.coze.cn/v1/workflow/run"
  32. headers = {
  33. "Content-Type": "application/json",
  34. "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
  35. }
  36. payload = {
  37. "workflow_id": "7507245138873450535",
  38. "parameters": data
  39. }
  40. print(f"[{datetime.now()}] 开始调用Coze工作流...")
  41. print(f"请求URL: {url}")
  42. print(f"请求头: {headers}")
  43. print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}")
  44. response = requests.post(url, json=payload, headers=headers, timeout=60)
  45. # 打印响应状态和头信息
  46. print(f"\n响应状态码: {response.status_code}")
  47. print(f"响应头: {dict(response.headers)}")
  48. # 尝试获取响应内容
  49. try:
  50. response_text = response.text
  51. print(f"响应内容: {response_text}")
  52. # 检查响应内容是否为空
  53. if not response_text.strip():
  54. print("警告: 响应内容为空")
  55. return None
  56. # 尝试解析JSON
  57. try:
  58. result = response.json()
  59. print(f"\n[{datetime.now()}] Coze工作流返回结果:")
  60. print(json.dumps(result, ensure_ascii=False, indent=2))
  61. return result
  62. except json.JSONDecodeError as je:
  63. print(f"JSON解析错误: {str(je)}")
  64. print(f"原始响应内容: {response_text}")
  65. return None
  66. except Exception as e:
  67. print(f"读取响应内容时出错: {str(e)}")
  68. return None
  69. except pd.errors.EmptyDataError:
  70. print("错误: Excel文件为空")
  71. except requests.exceptions.RequestException as e:
  72. print(f"请求错误: {str(e)}")
  73. if hasattr(e, 'response') and e.response is not None:
  74. print(f"错误响应状态码: {e.response.status_code}")
  75. print(f"错误响应内容: {e.response.text}")
  76. except Exception as e:
  77. print(f"发生未知错误: {str(e)}")
  78. return None
  79. def call_coze_api(summary, timeline):
  80. url = "https://api.coze.cn/v1/workflow/run"
  81. headers = {
  82. "Content-Type": "application/json",
  83. "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
  84. }
  85. payload = {
  86. "workflow_id": "7511970477477904393",
  87. "parameters": {
  88. "summary": summary,
  89. "timeline": timeline
  90. }
  91. }
  92. response = requests.post(url, json=payload, headers=headers, timeout=600)
  93. response.raise_for_status()
  94. return response.json()
  95. def extract_fields_from_response(resp):
  96. """
  97. 从响应中提取字段数据
  98. Args:
  99. resp: API响应字典,可能包含多层嵌套的JSON字符串
  100. Returns:
  101. tuple: (time_str, query_str, hook_str) 三个字符串,分别包含时间、问题和钩子话术
  102. """
  103. import json
  104. import re
  105. import logging
  106. logger = logging.getLogger(__name__)
  107. def extract_json_from_markdown(markdown_str):
  108. """从markdown代码块中提取JSON字符串"""
  109. # 匹配 ```json 和 ``` 之间的内容
  110. match = re.search(r'```(?:json)?\s*([\s\S]*?)```', markdown_str)
  111. if match:
  112. return match.group(1).strip()
  113. return markdown_str.strip()
  114. def parse_nested_json(json_str):
  115. """递归解析嵌套的JSON字符串"""
  116. try:
  117. # 尝试直接解析
  118. return json.loads(json_str)
  119. except json.JSONDecodeError:
  120. try:
  121. # 如果失败,尝试解析转义的JSON字符串
  122. unescaped = json_str.encode().decode('unicode_escape')
  123. return json.loads(unescaped)
  124. except:
  125. # 如果还是失败,返回原始字符串
  126. return json_str
  127. try:
  128. # 处理新的API响应格式
  129. if isinstance(resp, dict) and 'data' in resp:
  130. # 如果data是字符串,尝试解析它
  131. if isinstance(resp['data'], str):
  132. try:
  133. data = json.loads(resp['data'])
  134. if isinstance(data, dict) and 'output' in data:
  135. current_data = data['output']
  136. else:
  137. current_data = data
  138. except json.JSONDecodeError:
  139. current_data = resp['data']
  140. else:
  141. current_data = resp['data']
  142. else:
  143. current_data = resp
  144. # 处理多层嵌套的JSON
  145. while isinstance(current_data, (str, dict)):
  146. if isinstance(current_data, dict):
  147. # 如果是字典,尝试获取data或output字段
  148. if 'data' in current_data:
  149. current_data = current_data['data']
  150. elif 'output' in current_data:
  151. current_data = current_data['output']
  152. else:
  153. break
  154. elif isinstance(current_data, str):
  155. # 如果是字符串,尝试解析JSON
  156. try:
  157. parsed = parse_nested_json(current_data)
  158. if parsed == current_data: # 如果解析结果和输入相同,说明不是JSON
  159. break
  160. current_data = parsed
  161. except:
  162. break
  163. # 如果最终结果是字符串,尝试从markdown中提取
  164. if isinstance(current_data, str):
  165. current_data = extract_json_from_markdown(current_data)
  166. current_data = parse_nested_json(current_data)
  167. logger.info(f"Final parsed data type: {type(current_data)}")
  168. logger.info(f"Final data preview: {str(current_data)[:200]}...")
  169. # 确保数据是列表
  170. if not isinstance(current_data, list):
  171. logger.warning(f"Converting non-list data to list: {type(current_data)}")
  172. current_data = [current_data]
  173. # 提取并验证每个对象
  174. time_list = []
  175. query_list = []
  176. hook_list = []
  177. for item in current_data:
  178. if not isinstance(item, dict):
  179. logger.warning(f"Skipping invalid item: {item}")
  180. continue
  181. try:
  182. # 使用新的中文字段名获取值
  183. time = str(item.get('钩子出现时间', '')).strip()
  184. query = str(item.get('钩子到AI大模型的问题', '')).strip()
  185. hook = str(item.get('钩子话术', '')).strip()
  186. if time or query or hook: # 只添加非空值
  187. time_list.append(time)
  188. query_list.append(query)
  189. hook_list.append(hook)
  190. except Exception as e:
  191. logger.warning(f"Error extracting fields from item: {e}")
  192. continue
  193. # 将列表转换为字符串,用换行符连接
  194. time_str = '\n'.join(time_list) if time_list else ''
  195. query_str = '\n'.join(query_list) if query_list else ''
  196. hook_str = '\n'.join(hook_list) if hook_list else ''
  197. logger.info(f"Extracted - Time: {time_str[:50]}..., Query: {query_str[:50]}..., Hook: {hook_str[:50]}...")
  198. return time_str, query_str, hook_str
  199. except json.JSONDecodeError as je:
  200. logger.error(f"JSON decode error: {str(je)}")
  201. logger.error(f"Error position: {je.pos}")
  202. logger.error(f"Character at error: {repr(str(current_data)[je.pos:je.pos+10])}")
  203. return '', '', ''
  204. except Exception as e:
  205. logger.error(f"Error processing response: {str(e)}")
  206. logger.error(f"Response data: {str(resp)[:200]}...")
  207. return '', '', ''
  208. def process_excel(input_excel, output_excel=None):
  209. """
  210. 处理Excel文件,调用API并更新数据
  211. """
  212. try:
  213. df = pd.read_excel(input_excel)
  214. # 确保目标列存在且为字符串类型
  215. for col in [8, 9, 10]: # 对应第9、10、11列
  216. if col >= len(df.columns):
  217. df.insert(col, f'Column_{col}', '')
  218. df.iloc[:, col] = df.iloc[:, col].fillna('').astype(str)
  219. for idx, row in df.iterrows():
  220. try:
  221. print(f"\n开始处理第{idx+1}行")
  222. summary = str(row.iloc[5]) if pd.notna(row.iloc[5]) else "{}"
  223. timeline = str(row.iloc[6]) if pd.notna(row.iloc[6]) else "{}"
  224. print(f"Summary: {summary[:100]}...") # 只打印前100个字符
  225. print(f"Timeline: {timeline[:100]}...")
  226. resp = call_coze_api(summary, timeline)
  227. print(f"API Response: {json.dumps(resp, ensure_ascii=False)}") # 打印API响应
  228. time, query, hook = extract_fields_from_response(resp)
  229. # 更新数据 - 使用单个值赋值而不是批量赋值
  230. df.at[idx, df.columns[7]] = time
  231. df.at[idx, df.columns[8]] = hook
  232. df.at[idx, df.columns[9]] = query
  233. print(f"第{idx+1}行处理完成")
  234. print(f"Time: {time[:100]}...")
  235. print(f"Hook: {hook[:100]}...")
  236. print(f"Query: {query[:100]}...")
  237. except Exception as e:
  238. print(f"第{idx+1}行处理失败: {str(e)}")
  239. df.at[idx, df.columns[7]] = "error"
  240. df.at[idx, df.columns[8]] = "error"
  241. df.at[idx, df.columns[9]] = "error"
  242. # 每处理一行就保存一次,防止中断丢失
  243. df.to_excel(input_excel, index=False)
  244. print(f"已保存到第{idx+1}行")
  245. print("全部处理完成,已保存。")
  246. except Exception as e:
  247. print(f"处理Excel文件时发生错误: {str(e)}")
  248. raise
  249. if __name__ == "__main__":
  250. process_excel("文件/视频分析报告_new.xlsx")