123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import os
- import json
- import requests
- from typing import List, Dict, Any, Tuple
- from dotenv import load_dotenv
- import sys
- import time
- import logging
- import threading
- from concurrent.futures import ThreadPoolExecutor
- # 添加项目根目录到系统路径
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- # 导入项目中的openrouter模块
- from llm.openrouter import OpenRouterProcessor, OpenRouterModel
- # 配置日志记录
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'feishu_evaluation.log')),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- # 加载环境变量
- load_dotenv()
- # 检查环境变量
- if not os.getenv('FEISHU_APP_ID') or not os.getenv('FEISHU_APP_SECRET') or not os.getenv('OPENROUTER_API_TOKEN'):
- logger.error("缺少必要的环境变量,请检查.env文件")
- sys.exit(1)
- # 飞书API相关配置
- APP_ID = os.getenv('FEISHU_APP_ID')
- APP_SECRET = os.getenv('FEISHU_APP_SECRET')
- SHEET_TOKEN = "ESmNsJ3FWhcfbmtvwlAcvcWMngf"
- SHEET_NAME = "Zsag5j"
- # 最大并发数
- MAX_CONCURRENCY = 5
- class FeishuAPI:
- """飞书API处理类"""
-
- def __init__(self):
- self.app_id = APP_ID
- self.app_secret = APP_SECRET
- self.base_url = "https://open.feishu.cn/open-apis"
- self.access_token = None
- self.token_expires = 0
- self.lock = threading.Lock() # 添加线程锁保证线程安全
-
- def get_access_token(self):
- """获取飞书访问令牌"""
- # 使用线程锁确保线程安全
- with self.lock:
- # 如果令牌有效且未过期,直接返回
- if self.access_token and time.time() < self.token_expires:
- return self.access_token
-
- url = f"{self.base_url}/auth/v3/tenant_access_token/internal"
- payload = {
- "app_id": self.app_id,
- "app_secret": self.app_secret
- }
-
- try:
- response = requests.post(url, json=payload)
- result = response.json()
- if result.get("code") == 0:
- self.access_token = result.get("tenant_access_token")
- # 设置过期时间(提前5分钟过期)
- self.token_expires = time.time() + result.get("expire") - 300
- logger.info("成功获取飞书访问令牌")
- return self.access_token
- else:
- error_msg = f"获取飞书访问令牌失败: {result}"
- logger.error(error_msg)
- raise Exception(error_msg)
- except Exception as e:
- logger.error(f"获取飞书访问令牌时发生异常: {e}")
- raise
-
- def get_sheet_data(self):
- """获取飞书表格数据"""
- try:
- token = self.get_access_token()
- url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values/{SHEET_NAME}"
- headers = {"Authorization": f"Bearer {token}"}
-
- logger.info(f"正在获取表格数据: {SHEET_TOKEN}/{SHEET_NAME}")
- response = requests.get(url, headers=headers)
- result = response.json()
- if result.get("code") == 0:
- data = result.get("data", {}).get("valueRange", {}).get("values", [])
- logger.info(f"成功获取表格数据,共 {len(data)} 行")
- return data
- else:
- error_msg = f"获取飞书表格数据失败: {result}"
- logger.error(error_msg)
- raise Exception(error_msg)
- except Exception as e:
- logger.error(f"获取表格数据时发生异常: {e}")
- raise
-
- def update_sheet_cell(self, row_index, col_index, value, max_retries=1, retry_delay=2):
- """更新飞书表格单元格,带重试机制"""
- # 计算单元格范围(例如:'H2')
- col_letter = chr(ord('A') + col_index)
- range_str = f"{SHEET_NAME}!{col_letter}{row_index + 1}:{col_letter}{row_index + 1}"
-
- for retry in range(max_retries):
- try:
- # 每次重试都重新获取token,以防token过期
- token = self.get_access_token()
- url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values"
-
- payload = {
- "valueRange": {
- "range": range_str,
- "values": [[value]]
- }
- }
-
- headers = {"Authorization": f"Bearer {token}"}
-
- logger.debug(f"正在更新单元格: {range_str} (尝试 {retry+1}/{max_retries})")
- response = requests.put(url, headers=headers, json=payload)
- result = response.json()
-
- if result.get("code") == 0:
- logger.debug(f"成功更新单元格: {range_str}")
- return True
- elif result.get("code") == 91403 and retry < max_retries - 1: # Forbidden错误,可能是权限问题
- logger.warning(f"更新单元格 {range_str} 返回Forbidden错误,将在 {retry_delay} 秒后重试")
- time.sleep(retry_delay)
- # 下次重试延迟时间加倍
- retry_delay *= 2
- else:
- logger.warning(f"更新单元格 {range_str} 失败: {result}")
- return False
-
- except Exception as e:
- if retry < max_retries - 1:
- logger.warning(f"更新单元格 {range_str} 时发生异常: {e},将在 {retry_delay} 秒后重试")
- time.sleep(retry_delay)
- # 下次重试延迟时间加倍
- retry_delay *= 2
- else:
- logger.error(f"更新单元格 {range_str} 时发生异常: {e},已达到最大重试次数")
- return False
-
- return False # 所有重试都失败
- class EvaluationProcessor:
- """评估处理类"""
-
- def __init__(self):
- # 加载评估和打分的prompt
- self.consistency_prompt = self._load_prompt("p1.md")
- self.scoring_prompt = self._load_prompt("p2.md")
- # 初始化OpenRouter处理器
- self.llm_processor = OpenRouterProcessor(OpenRouterModel.GEMINI_25_FLASH)
-
- def _load_prompt(self, filename):
- """加载prompt文件"""
- current_dir = os.path.dirname(os.path.abspath(__file__))
- file_path = os.path.join(current_dir, filename)
-
- with open(file_path, 'r', encoding='utf-8') as file:
- return file.read()
-
- def evaluate_consistency(self, keyword, structured_result):
- """评估一致性"""
- try:
- input_data = {
- "query": keyword,
- "query结果文本": structured_result
- }
-
- logger.info(f"正在进行一致性评估: {keyword[:30]}...")
- # 调用LLM进行一致性评估
- result = self.llm_processor.process(input_data, self.consistency_prompt)
-
- try:
- # 尝试解析JSON结果
- # 处理可能的不完整JSON字符串
- result = result.strip()
- if result.startswith('```json') and '```' in result:
- # 提取JSON部分
- json_str = result.split('```json', 1)[1].split('```', 1)[0].strip()
- json_result = json.loads(json_str)
- else:
- json_result = json.loads(result)
-
- consistency = json_result.get("consistency", "")
- reason = json_result.get("reason", [])
- reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason)
- logger.info(f"一致性评估结果: {consistency}")
- return consistency, reason_str
- except json.JSONDecodeError as e:
- # 如果结果不是有效的JSON,尝试修复并重新解析
- logger.warning(f"一致性评估结果解析失败: {result[:200]}... 错误: {e}")
- try:
- # 尝试从文本中提取JSON部分
- if '{' in result and '}' in result:
- json_part = result[result.find('{'):result.rfind('}')+1]
- json_result = json.loads(json_part)
- consistency = json_result.get("consistency", "")
- reason = json_result.get("reason", [])
- reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason)
- logger.info(f"修复后解析成功,一致性评估结果: {consistency}")
- return consistency, reason_str
- except:
- pass
- return "解析错误", result[:500] # 限制返回长度
- except Exception as e:
- logger.error(f"一致性评估过程中发生异常: {e}")
- return "评估异常", str(e)
-
- def evaluate_score(self, keyword, structured_result):
- """评估打分"""
- try:
- input_data = {
- "query词": keyword,
- "文本知识": structured_result
- }
-
- logger.info(f"正在进行打分评估: {keyword[:30]}...")
- # 调用LLM进行打分评估
- result = self.llm_processor.process(input_data, self.scoring_prompt)
-
- try:
- # 尝试解析JSON结果
- # 处理可能的不完整JSON字符串
- result = result.strip()
- if result.startswith('```json') and '```' in result:
- # 提取JSON部分
- json_str = result.split('```json', 1)[1].split('```', 1)[0].strip()
- json_result = json.loads(json_str)
- else:
- json_result = json.loads(result)
-
- score = json_result.get("分数", "")
- reason = json_result.get("原因", {})
- # 将原因字典转换为字符串
- reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()])
- logger.info(f"打分评估结果: {score}")
- return score, reason_str
- except json.JSONDecodeError as e:
- # 如果结果不是有效的JSON,尝试修复并重新解析
- logger.warning(f"打分评估结果解析失败: {result[:200]}... 错误: {e}")
- try:
- # 尝试从文本中提取JSON部分
- if '{' in result and '}' in result:
- json_part = result[result.find('{'):result.rfind('}')+1]
- json_result = json.loads(json_part)
- score = json_result.get("分数", "")
- reason = json_result.get("原因", {})
- reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()])
- logger.info(f"修复后解析成功,打分评估结果: {score}")
- return score, reason_str
- except:
- pass
- return "解析错误", result[:500] # 限制返回长度
- except Exception as e:
- logger.error(f"打分评估过程中发生异常: {e}")
- return "评估异常", str(e)
- def process_row(row_index, row_data, feishu_api, evaluator):
- """处理单行数据"""
- try:
- # 获取关键词(C列)和结构化结果(G列)
- keyword = row_data[2] if len(row_data) > 2 else ""
- structured_result = row_data[6] if len(row_data) > 6 else ""
-
- if not keyword or not structured_result:
- logger.warning(f"行 {row_index+1} 数据不完整,跳过处理")
- return
-
- logger.info(f"开始处理行 {row_index+1}: {keyword[:30]}...")
-
- # 检查H列和J列是否已有数据,如果有则跳过处理
- h_value = row_data[7] if len(row_data) > 7 else ""
- j_value = row_data[9] if len(row_data) > 9 else ""
-
- # 评估一致性(如果H列为空)
- if not h_value:
- consistency, consistency_reason = evaluator.evaluate_consistency(keyword, structured_result)
-
- # 更新一致性结果(H列和I列)
- feishu_api.update_sheet_cell(row_index, 7, consistency) # H列
- feishu_api.update_sheet_cell(row_index, 8, consistency_reason) # I列
- else:
- logger.info(f"行 {row_index+1} 的一致性评估结果已存在,跳过处理")
-
- # 评估打分(如果J列为空)
- if not j_value:
- score, score_reason = evaluator.evaluate_score(keyword, structured_result)
-
- # 更新打分结果(J列和K列)
- feishu_api.update_sheet_cell(row_index, 9, score) # J列
- feishu_api.update_sheet_cell(row_index, 10, score_reason) # K列
- else:
- logger.info(f"行 {row_index+1} 的打分评估结果已存在,跳过处理")
-
- logger.info(f"行 {row_index+1} 处理完成")
-
- except Exception as e:
- logger.error(f"处理行 {row_index+1} 时出错: {e}", exc_info=True)
- def main():
- """主函数"""
- try:
- logger.info("开始执行飞书表格评估程序")
-
- # 初始化飞书API和评估处理器
- feishu_api = FeishuAPI()
- evaluator = EvaluationProcessor()
-
- # 获取表格数据
- sheet_data = feishu_api.get_sheet_data()
-
- if not sheet_data or len(sheet_data) <= 1: # 考虑表头
- logger.warning("表格数据为空或只有表头")
- return
-
- logger.info(f"共获取到 {len(sheet_data)-1} 行数据(不含表头)")
-
- # 创建线程池
- with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
- # 跳过表头,处理数据行
- futures = []
- for i, row in enumerate(sheet_data[1:], 1): # 从索引1开始(跳过表头)
- future = executor.submit(process_row, i, row, feishu_api, evaluator)
- futures.append(future)
-
- # 等待所有任务完成
- for future in futures:
- try:
- future.result() # 获取任务结果,如果有异常会在这里抛出
- except Exception as e:
- logger.error(f"任务执行失败: {e}", exc_info=True)
-
- logger.info("所有数据处理完成")
- except Exception as e:
- logger.error(f"程序执行过程中发生异常: {e}", exc_info=True)
- sys.exit(1)
- if __name__ == "__main__":
- # 运行主函数
- main()
|