#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import json import requests from typing import List, Dict, Any, Tuple from dotenv import load_dotenv import sys import time import logging import threading from concurrent.futures import ThreadPoolExecutor # 添加项目根目录到系统路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 导入项目中的openrouter模块 from llm.openrouter import OpenRouterProcessor, OpenRouterModel # 配置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'feishu_evaluation.log')), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 加载环境变量 load_dotenv() # 检查环境变量 if not os.getenv('FEISHU_APP_ID') or not os.getenv('FEISHU_APP_SECRET') or not os.getenv('OPENROUTER_API_TOKEN'): logger.error("缺少必要的环境变量,请检查.env文件") sys.exit(1) # 飞书API相关配置 APP_ID = os.getenv('FEISHU_APP_ID') APP_SECRET = os.getenv('FEISHU_APP_SECRET') SHEET_TOKEN = "ESmNsJ3FWhcfbmtvwlAcvcWMngf" SHEET_NAME = "Zsag5j" # 最大并发数 MAX_CONCURRENCY = 5 class FeishuAPI: """飞书API处理类""" def __init__(self): self.app_id = APP_ID self.app_secret = APP_SECRET self.base_url = "https://open.feishu.cn/open-apis" self.access_token = None self.token_expires = 0 self.lock = threading.Lock() # 添加线程锁保证线程安全 def get_access_token(self): """获取飞书访问令牌""" # 使用线程锁确保线程安全 with self.lock: # 如果令牌有效且未过期,直接返回 if self.access_token and time.time() < self.token_expires: return self.access_token url = f"{self.base_url}/auth/v3/tenant_access_token/internal" payload = { "app_id": self.app_id, "app_secret": self.app_secret } try: response = requests.post(url, json=payload) result = response.json() if result.get("code") == 0: self.access_token = result.get("tenant_access_token") # 设置过期时间(提前5分钟过期) self.token_expires = time.time() + result.get("expire") - 300 logger.info("成功获取飞书访问令牌") return self.access_token else: error_msg = f"获取飞书访问令牌失败: {result}" logger.error(error_msg) raise Exception(error_msg) except Exception as e: logger.error(f"获取飞书访问令牌时发生异常: {e}") raise def get_sheet_data(self): """获取飞书表格数据""" try: token = self.get_access_token() url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values/{SHEET_NAME}" headers = {"Authorization": f"Bearer {token}"} logger.info(f"正在获取表格数据: {SHEET_TOKEN}/{SHEET_NAME}") response = requests.get(url, headers=headers) result = response.json() if result.get("code") == 0: data = result.get("data", {}).get("valueRange", {}).get("values", []) logger.info(f"成功获取表格数据,共 {len(data)} 行") return data else: error_msg = f"获取飞书表格数据失败: {result}" logger.error(error_msg) raise Exception(error_msg) except Exception as e: logger.error(f"获取表格数据时发生异常: {e}") raise def update_sheet_cell(self, row_index, col_index, value, max_retries=1, retry_delay=2): """更新飞书表格单元格,带重试机制""" # 计算单元格范围(例如:'H2') col_letter = chr(ord('A') + col_index) range_str = f"{SHEET_NAME}!{col_letter}{row_index + 1}:{col_letter}{row_index + 1}" for retry in range(max_retries): try: # 每次重试都重新获取token,以防token过期 token = self.get_access_token() url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values" payload = { "valueRange": { "range": range_str, "values": [[value]] } } headers = {"Authorization": f"Bearer {token}"} logger.debug(f"正在更新单元格: {range_str} (尝试 {retry+1}/{max_retries})") response = requests.put(url, headers=headers, json=payload) result = response.json() if result.get("code") == 0: logger.debug(f"成功更新单元格: {range_str}") return True elif result.get("code") == 91403 and retry < max_retries - 1: # Forbidden错误,可能是权限问题 logger.warning(f"更新单元格 {range_str} 返回Forbidden错误,将在 {retry_delay} 秒后重试") time.sleep(retry_delay) # 下次重试延迟时间加倍 retry_delay *= 2 else: logger.warning(f"更新单元格 {range_str} 失败: {result}") return False except Exception as e: if retry < max_retries - 1: logger.warning(f"更新单元格 {range_str} 时发生异常: {e},将在 {retry_delay} 秒后重试") time.sleep(retry_delay) # 下次重试延迟时间加倍 retry_delay *= 2 else: logger.error(f"更新单元格 {range_str} 时发生异常: {e},已达到最大重试次数") return False return False # 所有重试都失败 class EvaluationProcessor: """评估处理类""" def __init__(self): # 加载评估和打分的prompt self.consistency_prompt = self._load_prompt("p1.md") self.scoring_prompt = self._load_prompt("p2.md") # 初始化OpenRouter处理器 self.llm_processor = OpenRouterProcessor(OpenRouterModel.GEMINI_25_FLASH) def _load_prompt(self, filename): """加载prompt文件""" current_dir = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(current_dir, filename) with open(file_path, 'r', encoding='utf-8') as file: return file.read() def evaluate_consistency(self, keyword, structured_result): """评估一致性""" try: input_data = { "query": keyword, "query结果文本": structured_result } logger.info(f"正在进行一致性评估: {keyword[:30]}...") # 调用LLM进行一致性评估 result = self.llm_processor.process(input_data, self.consistency_prompt) try: # 尝试解析JSON结果 # 处理可能的不完整JSON字符串 result = result.strip() if result.startswith('```json') and '```' in result: # 提取JSON部分 json_str = result.split('```json', 1)[1].split('```', 1)[0].strip() json_result = json.loads(json_str) else: json_result = json.loads(result) consistency = json_result.get("consistency", "") reason = json_result.get("reason", []) reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason) logger.info(f"一致性评估结果: {consistency}") return consistency, reason_str except json.JSONDecodeError as e: # 如果结果不是有效的JSON,尝试修复并重新解析 logger.warning(f"一致性评估结果解析失败: {result[:200]}... 错误: {e}") try: # 尝试从文本中提取JSON部分 if '{' in result and '}' in result: json_part = result[result.find('{'):result.rfind('}')+1] json_result = json.loads(json_part) consistency = json_result.get("consistency", "") reason = json_result.get("reason", []) reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason) logger.info(f"修复后解析成功,一致性评估结果: {consistency}") return consistency, reason_str except: pass return "解析错误", result[:500] # 限制返回长度 except Exception as e: logger.error(f"一致性评估过程中发生异常: {e}") return "评估异常", str(e) def evaluate_score(self, keyword, structured_result): """评估打分""" try: input_data = { "query词": keyword, "文本知识": structured_result } logger.info(f"正在进行打分评估: {keyword[:30]}...") # 调用LLM进行打分评估 result = self.llm_processor.process(input_data, self.scoring_prompt) try: # 尝试解析JSON结果 # 处理可能的不完整JSON字符串 result = result.strip() if result.startswith('```json') and '```' in result: # 提取JSON部分 json_str = result.split('```json', 1)[1].split('```', 1)[0].strip() json_result = json.loads(json_str) else: json_result = json.loads(result) score = json_result.get("分数", "") reason = json_result.get("原因", {}) # 将原因字典转换为字符串 reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()]) logger.info(f"打分评估结果: {score}") return score, reason_str except json.JSONDecodeError as e: # 如果结果不是有效的JSON,尝试修复并重新解析 logger.warning(f"打分评估结果解析失败: {result[:200]}... 错误: {e}") try: # 尝试从文本中提取JSON部分 if '{' in result and '}' in result: json_part = result[result.find('{'):result.rfind('}')+1] json_result = json.loads(json_part) score = json_result.get("分数", "") reason = json_result.get("原因", {}) reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()]) logger.info(f"修复后解析成功,打分评估结果: {score}") return score, reason_str except: pass return "解析错误", result[:500] # 限制返回长度 except Exception as e: logger.error(f"打分评估过程中发生异常: {e}") return "评估异常", str(e) def process_row(row_index, row_data, feishu_api, evaluator): """处理单行数据""" try: # 获取关键词(C列)和结构化结果(G列) keyword = row_data[2] if len(row_data) > 2 else "" structured_result = row_data[6] if len(row_data) > 6 else "" if not keyword or not structured_result: logger.warning(f"行 {row_index+1} 数据不完整,跳过处理") return logger.info(f"开始处理行 {row_index+1}: {keyword[:30]}...") # 检查H列和J列是否已有数据,如果有则跳过处理 h_value = row_data[7] if len(row_data) > 7 else "" j_value = row_data[9] if len(row_data) > 9 else "" # 评估一致性(如果H列为空) if not h_value: consistency, consistency_reason = evaluator.evaluate_consistency(keyword, structured_result) # 更新一致性结果(H列和I列) feishu_api.update_sheet_cell(row_index, 7, consistency) # H列 feishu_api.update_sheet_cell(row_index, 8, consistency_reason) # I列 else: logger.info(f"行 {row_index+1} 的一致性评估结果已存在,跳过处理") # 评估打分(如果J列为空) if not j_value: score, score_reason = evaluator.evaluate_score(keyword, structured_result) # 更新打分结果(J列和K列) feishu_api.update_sheet_cell(row_index, 9, score) # J列 feishu_api.update_sheet_cell(row_index, 10, score_reason) # K列 else: logger.info(f"行 {row_index+1} 的打分评估结果已存在,跳过处理") logger.info(f"行 {row_index+1} 处理完成") except Exception as e: logger.error(f"处理行 {row_index+1} 时出错: {e}", exc_info=True) def main(): """主函数""" try: logger.info("开始执行飞书表格评估程序") # 初始化飞书API和评估处理器 feishu_api = FeishuAPI() evaluator = EvaluationProcessor() # 获取表格数据 sheet_data = feishu_api.get_sheet_data() if not sheet_data or len(sheet_data) <= 1: # 考虑表头 logger.warning("表格数据为空或只有表头") return logger.info(f"共获取到 {len(sheet_data)-1} 行数据(不含表头)") # 创建线程池 with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor: # 跳过表头,处理数据行 futures = [] for i, row in enumerate(sheet_data[1:], 1): # 从索引1开始(跳过表头) future = executor.submit(process_row, i, row, feishu_api, evaluator) futures.append(future) # 等待所有任务完成 for future in futures: try: future.result() # 获取任务结果,如果有异常会在这里抛出 except Exception as e: logger.error(f"任务执行失败: {e}", exc_info=True) logger.info("所有数据处理完成") except Exception as e: logger.error(f"程序执行过程中发生异常: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": # 运行主函数 main()