feishu_evaluation.py 15 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import os
  4. import json
  5. import requests
  6. from typing import List, Dict, Any, Tuple
  7. from dotenv import load_dotenv
  8. import sys
  9. import time
  10. import logging
  11. import threading
  12. from concurrent.futures import ThreadPoolExecutor
  13. # 添加项目根目录到系统路径
  14. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  15. # 导入项目中的openrouter模块
  16. from llm.openrouter import OpenRouterProcessor, OpenRouterModel
  17. # 配置日志记录
  18. logging.basicConfig(
  19. level=logging.INFO,
  20. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  21. handlers=[
  22. logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'feishu_evaluation.log')),
  23. logging.StreamHandler()
  24. ]
  25. )
  26. logger = logging.getLogger(__name__)
  27. # 加载环境变量
  28. load_dotenv()
  29. # 检查环境变量
  30. if not os.getenv('FEISHU_APP_ID') or not os.getenv('FEISHU_APP_SECRET') or not os.getenv('OPENROUTER_API_TOKEN'):
  31. logger.error("缺少必要的环境变量,请检查.env文件")
  32. sys.exit(1)
  33. # 飞书API相关配置
  34. APP_ID = os.getenv('FEISHU_APP_ID')
  35. APP_SECRET = os.getenv('FEISHU_APP_SECRET')
  36. SHEET_TOKEN = "ESmNsJ3FWhcfbmtvwlAcvcWMngf"
  37. SHEET_NAME = "Zsag5j"
  38. # 最大并发数
  39. MAX_CONCURRENCY = 5
  40. class FeishuAPI:
  41. """飞书API处理类"""
  42. def __init__(self):
  43. self.app_id = APP_ID
  44. self.app_secret = APP_SECRET
  45. self.base_url = "https://open.feishu.cn/open-apis"
  46. self.access_token = None
  47. self.token_expires = 0
  48. self.lock = threading.Lock() # 添加线程锁保证线程安全
  49. def get_access_token(self):
  50. """获取飞书访问令牌"""
  51. # 使用线程锁确保线程安全
  52. with self.lock:
  53. # 如果令牌有效且未过期,直接返回
  54. if self.access_token and time.time() < self.token_expires:
  55. return self.access_token
  56. url = f"{self.base_url}/auth/v3/tenant_access_token/internal"
  57. payload = {
  58. "app_id": self.app_id,
  59. "app_secret": self.app_secret
  60. }
  61. try:
  62. response = requests.post(url, json=payload)
  63. result = response.json()
  64. if result.get("code") == 0:
  65. self.access_token = result.get("tenant_access_token")
  66. # 设置过期时间(提前5分钟过期)
  67. self.token_expires = time.time() + result.get("expire") - 300
  68. logger.info("成功获取飞书访问令牌")
  69. return self.access_token
  70. else:
  71. error_msg = f"获取飞书访问令牌失败: {result}"
  72. logger.error(error_msg)
  73. raise Exception(error_msg)
  74. except Exception as e:
  75. logger.error(f"获取飞书访问令牌时发生异常: {e}")
  76. raise
  77. def get_sheet_data(self):
  78. """获取飞书表格数据"""
  79. try:
  80. token = self.get_access_token()
  81. url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values/{SHEET_NAME}"
  82. headers = {"Authorization": f"Bearer {token}"}
  83. logger.info(f"正在获取表格数据: {SHEET_TOKEN}/{SHEET_NAME}")
  84. response = requests.get(url, headers=headers)
  85. result = response.json()
  86. if result.get("code") == 0:
  87. data = result.get("data", {}).get("valueRange", {}).get("values", [])
  88. logger.info(f"成功获取表格数据,共 {len(data)} 行")
  89. return data
  90. else:
  91. error_msg = f"获取飞书表格数据失败: {result}"
  92. logger.error(error_msg)
  93. raise Exception(error_msg)
  94. except Exception as e:
  95. logger.error(f"获取表格数据时发生异常: {e}")
  96. raise
  97. def update_sheet_cell(self, row_index, col_index, value, max_retries=1, retry_delay=2):
  98. """更新飞书表格单元格,带重试机制"""
  99. # 计算单元格范围(例如:'H2')
  100. col_letter = chr(ord('A') + col_index)
  101. range_str = f"{SHEET_NAME}!{col_letter}{row_index + 1}:{col_letter}{row_index + 1}"
  102. for retry in range(max_retries):
  103. try:
  104. # 每次重试都重新获取token,以防token过期
  105. token = self.get_access_token()
  106. url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values"
  107. payload = {
  108. "valueRange": {
  109. "range": range_str,
  110. "values": [[value]]
  111. }
  112. }
  113. headers = {"Authorization": f"Bearer {token}"}
  114. logger.debug(f"正在更新单元格: {range_str} (尝试 {retry+1}/{max_retries})")
  115. response = requests.put(url, headers=headers, json=payload)
  116. result = response.json()
  117. if result.get("code") == 0:
  118. logger.debug(f"成功更新单元格: {range_str}")
  119. return True
  120. elif result.get("code") == 91403 and retry < max_retries - 1: # Forbidden错误,可能是权限问题
  121. logger.warning(f"更新单元格 {range_str} 返回Forbidden错误,将在 {retry_delay} 秒后重试")
  122. time.sleep(retry_delay)
  123. # 下次重试延迟时间加倍
  124. retry_delay *= 2
  125. else:
  126. logger.warning(f"更新单元格 {range_str} 失败: {result}")
  127. return False
  128. except Exception as e:
  129. if retry < max_retries - 1:
  130. logger.warning(f"更新单元格 {range_str} 时发生异常: {e},将在 {retry_delay} 秒后重试")
  131. time.sleep(retry_delay)
  132. # 下次重试延迟时间加倍
  133. retry_delay *= 2
  134. else:
  135. logger.error(f"更新单元格 {range_str} 时发生异常: {e},已达到最大重试次数")
  136. return False
  137. return False # 所有重试都失败
  138. class EvaluationProcessor:
  139. """评估处理类"""
  140. def __init__(self):
  141. # 加载评估和打分的prompt
  142. self.consistency_prompt = self._load_prompt("p1.md")
  143. self.scoring_prompt = self._load_prompt("p2.md")
  144. # 初始化OpenRouter处理器
  145. self.llm_processor = OpenRouterProcessor(OpenRouterModel.GEMINI_25_FLASH)
  146. def _load_prompt(self, filename):
  147. """加载prompt文件"""
  148. current_dir = os.path.dirname(os.path.abspath(__file__))
  149. file_path = os.path.join(current_dir, filename)
  150. with open(file_path, 'r', encoding='utf-8') as file:
  151. return file.read()
  152. def evaluate_consistency(self, keyword, structured_result):
  153. """评估一致性"""
  154. try:
  155. input_data = {
  156. "query": keyword,
  157. "query结果文本": structured_result
  158. }
  159. logger.info(f"正在进行一致性评估: {keyword[:30]}...")
  160. # 调用LLM进行一致性评估
  161. result = self.llm_processor.process(input_data, self.consistency_prompt)
  162. try:
  163. # 尝试解析JSON结果
  164. # 处理可能的不完整JSON字符串
  165. result = result.strip()
  166. if result.startswith('```json') and '```' in result:
  167. # 提取JSON部分
  168. json_str = result.split('```json', 1)[1].split('```', 1)[0].strip()
  169. json_result = json.loads(json_str)
  170. else:
  171. json_result = json.loads(result)
  172. consistency = json_result.get("consistency", "")
  173. reason = json_result.get("reason", [])
  174. reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason)
  175. logger.info(f"一致性评估结果: {consistency}")
  176. return consistency, reason_str
  177. except json.JSONDecodeError as e:
  178. # 如果结果不是有效的JSON,尝试修复并重新解析
  179. logger.warning(f"一致性评估结果解析失败: {result[:200]}... 错误: {e}")
  180. try:
  181. # 尝试从文本中提取JSON部分
  182. if '{' in result and '}' in result:
  183. json_part = result[result.find('{'):result.rfind('}')+1]
  184. json_result = json.loads(json_part)
  185. consistency = json_result.get("consistency", "")
  186. reason = json_result.get("reason", [])
  187. reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason)
  188. logger.info(f"修复后解析成功,一致性评估结果: {consistency}")
  189. return consistency, reason_str
  190. except:
  191. pass
  192. return "解析错误", result[:500] # 限制返回长度
  193. except Exception as e:
  194. logger.error(f"一致性评估过程中发生异常: {e}")
  195. return "评估异常", str(e)
  196. def evaluate_score(self, keyword, structured_result):
  197. """评估打分"""
  198. try:
  199. input_data = {
  200. "query词": keyword,
  201. "文本知识": structured_result
  202. }
  203. logger.info(f"正在进行打分评估: {keyword[:30]}...")
  204. # 调用LLM进行打分评估
  205. result = self.llm_processor.process(input_data, self.scoring_prompt)
  206. try:
  207. # 尝试解析JSON结果
  208. # 处理可能的不完整JSON字符串
  209. result = result.strip()
  210. if result.startswith('```json') and '```' in result:
  211. # 提取JSON部分
  212. json_str = result.split('```json', 1)[1].split('```', 1)[0].strip()
  213. json_result = json.loads(json_str)
  214. else:
  215. json_result = json.loads(result)
  216. score = json_result.get("分数", "")
  217. reason = json_result.get("原因", {})
  218. # 将原因字典转换为字符串
  219. reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()])
  220. logger.info(f"打分评估结果: {score}")
  221. return score, reason_str
  222. except json.JSONDecodeError as e:
  223. # 如果结果不是有效的JSON,尝试修复并重新解析
  224. logger.warning(f"打分评估结果解析失败: {result[:200]}... 错误: {e}")
  225. try:
  226. # 尝试从文本中提取JSON部分
  227. if '{' in result and '}' in result:
  228. json_part = result[result.find('{'):result.rfind('}')+1]
  229. json_result = json.loads(json_part)
  230. score = json_result.get("分数", "")
  231. reason = json_result.get("原因", {})
  232. reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()])
  233. logger.info(f"修复后解析成功,打分评估结果: {score}")
  234. return score, reason_str
  235. except:
  236. pass
  237. return "解析错误", result[:500] # 限制返回长度
  238. except Exception as e:
  239. logger.error(f"打分评估过程中发生异常: {e}")
  240. return "评估异常", str(e)
  241. def process_row(row_index, row_data, feishu_api, evaluator):
  242. """处理单行数据"""
  243. try:
  244. # 获取关键词(C列)和结构化结果(G列)
  245. keyword = row_data[2] if len(row_data) > 2 else ""
  246. structured_result = row_data[6] if len(row_data) > 6 else ""
  247. if not keyword or not structured_result:
  248. logger.warning(f"行 {row_index+1} 数据不完整,跳过处理")
  249. return
  250. logger.info(f"开始处理行 {row_index+1}: {keyword[:30]}...")
  251. # 检查H列和J列是否已有数据,如果有则跳过处理
  252. h_value = row_data[7] if len(row_data) > 7 else ""
  253. j_value = row_data[9] if len(row_data) > 9 else ""
  254. # 评估一致性(如果H列为空)
  255. if not h_value:
  256. consistency, consistency_reason = evaluator.evaluate_consistency(keyword, structured_result)
  257. # 更新一致性结果(H列和I列)
  258. feishu_api.update_sheet_cell(row_index, 7, consistency) # H列
  259. feishu_api.update_sheet_cell(row_index, 8, consistency_reason) # I列
  260. else:
  261. logger.info(f"行 {row_index+1} 的一致性评估结果已存在,跳过处理")
  262. # 评估打分(如果J列为空)
  263. if not j_value:
  264. score, score_reason = evaluator.evaluate_score(keyword, structured_result)
  265. # 更新打分结果(J列和K列)
  266. feishu_api.update_sheet_cell(row_index, 9, score) # J列
  267. feishu_api.update_sheet_cell(row_index, 10, score_reason) # K列
  268. else:
  269. logger.info(f"行 {row_index+1} 的打分评估结果已存在,跳过处理")
  270. logger.info(f"行 {row_index+1} 处理完成")
  271. except Exception as e:
  272. logger.error(f"处理行 {row_index+1} 时出错: {e}", exc_info=True)
  273. def main():
  274. """主函数"""
  275. try:
  276. logger.info("开始执行飞书表格评估程序")
  277. # 初始化飞书API和评估处理器
  278. feishu_api = FeishuAPI()
  279. evaluator = EvaluationProcessor()
  280. # 获取表格数据
  281. sheet_data = feishu_api.get_sheet_data()
  282. if not sheet_data or len(sheet_data) <= 1: # 考虑表头
  283. logger.warning("表格数据为空或只有表头")
  284. return
  285. logger.info(f"共获取到 {len(sheet_data)-1} 行数据(不含表头)")
  286. # 创建线程池
  287. with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
  288. # 跳过表头,处理数据行
  289. futures = []
  290. for i, row in enumerate(sheet_data[1:], 1): # 从索引1开始(跳过表头)
  291. future = executor.submit(process_row, i, row, feishu_api, evaluator)
  292. futures.append(future)
  293. # 等待所有任务完成
  294. for future in futures:
  295. try:
  296. future.result() # 获取任务结果,如果有异常会在这里抛出
  297. except Exception as e:
  298. logger.error(f"任务执行失败: {e}", exc_info=True)
  299. logger.info("所有数据处理完成")
  300. except Exception as e:
  301. logger.error(f"程序执行过程中发生异常: {e}", exc_info=True)
  302. sys.exit(1)
  303. if __name__ == "__main__":
  304. # 运行主函数
  305. main()