丁云鹏 2 nedēļas atpakaļ
vecāks
revīzija
c22f839284
6 mainītis faili ar 579 papildinājumiem un 1 dzēšanām
  1. 3 0
      .env
  2. 0 1
      llm/openrouter.py
  3. 67 0
      offline/README.md
  4. 359 0
      offline/feishu_evaluation.py
  5. 69 0
      offline/p1.md
  6. 81 0
      offline/p2.md

+ 3 - 0
.env

@@ -50,3 +50,6 @@ LANGCHAIN_PROJECT=knowledge-agent
 OPENAI_API_KEY=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
 TAVILY_API_KEY=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
 
+FEISHU_APP_ID=cli_a75d795d877d901c
+FEISHU_APP_SECRET=nDTqlBmAdKEyPThegK50ZbS4lKsCcYlN
+

+ 0 - 1
llm/openrouter.py

@@ -6,7 +6,6 @@ import json
 from typing import Any, Dict, Optional
 from enum import Enum
 from dotenv import load_dotenv
-import google.generativeai as genai
 
 from openai import OpenAI
 

+ 67 - 0
offline/README.md

@@ -0,0 +1,67 @@
+# 飞书表格评估程序
+
+这个程序用于从飞书表格读取数据,进行一致性评估和打分,然后将结果写回飞书表格。
+
+## 功能特点
+
+1. 从飞书表格读取C列(关键词)和G列(结构化结果)数据
+2. 使用OpenRouter API进行一致性评估,结果写入H列(一致性结论)和I列(原因)
+3. 使用OpenRouter API进行打分评估,结果写入J列(打分结果)和K列(原因)
+4. 支持并发处理,提高效率
+5. 完善的日志记录和错误处理
+6. 自动跳过已处理的行,支持断点续传
+
+## 环境要求
+
+- Python 3.7+
+- 必要的Python包:aiohttp, pandas, python-dotenv
+
+## 配置说明
+
+在项目根目录的`.env`文件中配置以下环境变量:
+
+```
+# 飞书API配置
+FEISHU_APP_ID=your_feishu_app_id
+FEISHU_APP_SECRET=your_feishu_app_secret
+
+# OpenRouter API配置
+OPENROUTER_API_TOKEN=your_openrouter_api_token
+OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
+```
+
+## 使用方法
+
+1. 确保已安装所有依赖包:
+
+```bash
+pip install -r requirements.txt
+```
+
+2. 配置环境变量
+
+3. 运行程序:
+
+```bash
+python feishu_evaluation.py
+```
+
+## 程序流程
+
+1. 程序启动后,首先获取飞书访问令牌
+2. 读取飞书表格数据
+3. 对每一行数据进行处理:
+   - 检查H列和J列是否已有数据,如果有则跳过相应的评估
+   - 进行一致性评估并更新H列和I列
+   - 进行打分评估并更新J列和K列
+4. 所有数据处理完成后,程序结束
+
+## 日志
+
+程序运行日志保存在`feishu_evaluation.log`文件中,同时也会在控制台输出。
+
+## 注意事项
+
+- 程序默认并发数为10,可以通过修改`MAX_CONCURRENCY`变量调整
+- 如果API调用失败,程序会记录错误并继续处理下一行数据
+- 程序支持断点续传,已处理的行不会重复处理

+ 359 - 0
offline/feishu_evaluation.py

@@ -0,0 +1,359 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import os
+import json
+import requests
+from typing import List, Dict, Any, Tuple
+from dotenv import load_dotenv
+import sys
+import time
+import logging
+import threading
+from concurrent.futures import ThreadPoolExecutor
+
+# 添加项目根目录到系统路径
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+# 导入项目中的openrouter模块
+from llm.openrouter import OpenRouterProcessor, OpenRouterModel
+
+# 配置日志记录
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'feishu_evaluation.log')),
+        logging.StreamHandler()
+    ]
+)
+logger = logging.getLogger(__name__)
+
+# 加载环境变量
+load_dotenv()
+
+# 检查环境变量
+if not os.getenv('FEISHU_APP_ID') or not os.getenv('FEISHU_APP_SECRET') or not os.getenv('OPENROUTER_API_TOKEN'):
+    logger.error("缺少必要的环境变量,请检查.env文件")
+    sys.exit(1)
+
+# 飞书API相关配置
+APP_ID = os.getenv('FEISHU_APP_ID')
+APP_SECRET = os.getenv('FEISHU_APP_SECRET')
+SHEET_TOKEN = "ESmNsJ3FWhcfbmtvwlAcvcWMngf"
+SHEET_NAME = "Zsag5j"
+
+# 最大并发数
+MAX_CONCURRENCY = 5
+
+class FeishuAPI:
+    """飞书API处理类"""
+    
+    def __init__(self):
+        self.app_id = APP_ID
+        self.app_secret = APP_SECRET
+        self.base_url = "https://open.feishu.cn/open-apis"
+        self.access_token = None
+        self.token_expires = 0
+        self.lock = threading.Lock()  # 添加线程锁保证线程安全
+    
+    def get_access_token(self):
+        """获取飞书访问令牌"""
+        # 使用线程锁确保线程安全
+        with self.lock:
+            # 如果令牌有效且未过期,直接返回
+            if self.access_token and time.time() < self.token_expires:
+                return self.access_token
+                
+            url = f"{self.base_url}/auth/v3/tenant_access_token/internal"
+            payload = {
+                "app_id": self.app_id,
+                "app_secret": self.app_secret
+            }
+            
+            try:
+                response = requests.post(url, json=payload)
+                result = response.json()
+                if result.get("code") == 0:
+                    self.access_token = result.get("tenant_access_token")
+                    # 设置过期时间(提前5分钟过期)
+                    self.token_expires = time.time() + result.get("expire") - 300
+                    logger.info("成功获取飞书访问令牌")
+                    return self.access_token
+                else:
+                    error_msg = f"获取飞书访问令牌失败: {result}"
+                    logger.error(error_msg)
+                    raise Exception(error_msg)
+            except Exception as e:
+                logger.error(f"获取飞书访问令牌时发生异常: {e}")
+                raise
+    
+    def get_sheet_data(self):
+        """获取飞书表格数据"""
+        try:
+            token = self.get_access_token()
+            url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values/{SHEET_NAME}"
+            headers = {"Authorization": f"Bearer {token}"}
+            
+            logger.info(f"正在获取表格数据: {SHEET_TOKEN}/{SHEET_NAME}")
+            response = requests.get(url, headers=headers)
+            result = response.json()
+            if result.get("code") == 0:
+                data = result.get("data", {}).get("valueRange", {}).get("values", [])
+                logger.info(f"成功获取表格数据,共 {len(data)} 行")
+                return data
+            else:
+                error_msg = f"获取飞书表格数据失败: {result}"
+                logger.error(error_msg)
+                raise Exception(error_msg)
+        except Exception as e:
+            logger.error(f"获取表格数据时发生异常: {e}")
+            raise
+    
+    def update_sheet_cell(self, row_index, col_index, value, max_retries=1, retry_delay=2):
+        """更新飞书表格单元格,带重试机制"""
+        # 计算单元格范围(例如:'H2')
+        col_letter = chr(ord('A') + col_index)
+        range_str = f"{SHEET_NAME}!{col_letter}{row_index + 1}:{col_letter}{row_index + 1}"
+        
+        for retry in range(max_retries):
+            try:
+                # 每次重试都重新获取token,以防token过期
+                token = self.get_access_token()
+                url = f"{self.base_url}/sheets/v2/spreadsheets/{SHEET_TOKEN}/values"
+                
+                payload = {
+                    "valueRange": {
+                        "range": range_str,
+                        "values": [[value]]
+                    }
+                }
+                
+                headers = {"Authorization": f"Bearer {token}"}
+                
+                logger.debug(f"正在更新单元格: {range_str} (尝试 {retry+1}/{max_retries})")
+                response = requests.put(url, headers=headers, json=payload)
+                result = response.json()
+                
+                if result.get("code") == 0:
+                    logger.debug(f"成功更新单元格: {range_str}")
+                    return True
+                elif result.get("code") == 91403 and retry < max_retries - 1:  # Forbidden错误,可能是权限问题
+                    logger.warning(f"更新单元格 {range_str} 返回Forbidden错误,将在 {retry_delay} 秒后重试")
+                    time.sleep(retry_delay)
+                    # 下次重试延迟时间加倍
+                    retry_delay *= 2
+                else:
+                    logger.warning(f"更新单元格 {range_str} 失败: {result}")
+                    return False
+                    
+            except Exception as e:
+                if retry < max_retries - 1:
+                    logger.warning(f"更新单元格 {range_str} 时发生异常: {e},将在 {retry_delay} 秒后重试")
+                    time.sleep(retry_delay)
+                    # 下次重试延迟时间加倍
+                    retry_delay *= 2
+                else:
+                    logger.error(f"更新单元格 {range_str} 时发生异常: {e},已达到最大重试次数")
+                    return False
+        
+        return False  # 所有重试都失败
+
+class EvaluationProcessor:
+    """评估处理类"""
+    
+    def __init__(self):
+        # 加载评估和打分的prompt
+        self.consistency_prompt = self._load_prompt("p1.md")
+        self.scoring_prompt = self._load_prompt("p2.md")
+        # 初始化OpenRouter处理器
+        self.llm_processor = OpenRouterProcessor(OpenRouterModel.GEMINI_25_FLASH)
+        
+    def _load_prompt(self, filename):
+        """加载prompt文件"""
+        current_dir = os.path.dirname(os.path.abspath(__file__))
+        file_path = os.path.join(current_dir, filename)
+        
+        with open(file_path, 'r', encoding='utf-8') as file:
+            return file.read()
+    
+    def evaluate_consistency(self, keyword, structured_result):
+        """评估一致性"""
+        try:
+            input_data = {
+                "query": keyword,
+                "query结果文本": structured_result
+            }
+            
+            logger.info(f"正在进行一致性评估: {keyword[:30]}...")
+            # 调用LLM进行一致性评估
+            result = self.llm_processor.process(input_data, self.consistency_prompt)
+            
+            try:
+                # 尝试解析JSON结果
+                # 处理可能的不完整JSON字符串
+                result = result.strip()
+                if result.startswith('```json') and '```' in result:
+                    # 提取JSON部分
+                    json_str = result.split('```json', 1)[1].split('```', 1)[0].strip()
+                    json_result = json.loads(json_str)
+                else:
+                    json_result = json.loads(result)
+                    
+                consistency = json_result.get("consistency", "")
+                reason = json_result.get("reason", [])
+                reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason)
+                logger.info(f"一致性评估结果: {consistency}")
+                return consistency, reason_str
+            except json.JSONDecodeError as e:
+                # 如果结果不是有效的JSON,尝试修复并重新解析
+                logger.warning(f"一致性评估结果解析失败: {result[:200]}... 错误: {e}")
+                try:
+                    # 尝试从文本中提取JSON部分
+                    if '{' in result and '}' in result:
+                        json_part = result[result.find('{'):result.rfind('}')+1]
+                        json_result = json.loads(json_part)
+                        consistency = json_result.get("consistency", "")
+                        reason = json_result.get("reason", [])
+                        reason_str = "\n".join(reason) if isinstance(reason, list) else str(reason)
+                        logger.info(f"修复后解析成功,一致性评估结果: {consistency}")
+                        return consistency, reason_str
+                except:
+                    pass
+                return "解析错误", result[:500]  # 限制返回长度
+        except Exception as e:
+            logger.error(f"一致性评估过程中发生异常: {e}")
+            return "评估异常", str(e)
+    
+    def evaluate_score(self, keyword, structured_result):
+        """评估打分"""
+        try:
+            input_data = {
+                "query词": keyword,
+                "文本知识": structured_result
+            }
+            
+            logger.info(f"正在进行打分评估: {keyword[:30]}...")
+            # 调用LLM进行打分评估
+            result = self.llm_processor.process(input_data, self.scoring_prompt)
+            
+            try:
+                # 尝试解析JSON结果
+                # 处理可能的不完整JSON字符串
+                result = result.strip()
+                if result.startswith('```json') and '```' in result:
+                    # 提取JSON部分
+                    json_str = result.split('```json', 1)[1].split('```', 1)[0].strip()
+                    json_result = json.loads(json_str)
+                else:
+                    json_result = json.loads(result)
+                    
+                score = json_result.get("分数", "")
+                reason = json_result.get("原因", {})
+                # 将原因字典转换为字符串
+                reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()])
+                logger.info(f"打分评估结果: {score}")
+                return score, reason_str
+            except json.JSONDecodeError as e:
+                # 如果结果不是有效的JSON,尝试修复并重新解析
+                logger.warning(f"打分评估结果解析失败: {result[:200]}... 错误: {e}")
+                try:
+                    # 尝试从文本中提取JSON部分
+                    if '{' in result and '}' in result:
+                        json_part = result[result.find('{'):result.rfind('}')+1]
+                        json_result = json.loads(json_part)
+                        score = json_result.get("分数", "")
+                        reason = json_result.get("原因", {})
+                        reason_str = "\n".join([f"{k}: {v}" for k, v in reason.items()])
+                        logger.info(f"修复后解析成功,打分评估结果: {score}")
+                        return score, reason_str
+                except:
+                    pass
+                return "解析错误", result[:500]  # 限制返回长度
+        except Exception as e:
+            logger.error(f"打分评估过程中发生异常: {e}")
+            return "评估异常", str(e)
+
+def process_row(row_index, row_data, feishu_api, evaluator):
+    """处理单行数据"""
+    try:
+        # 获取关键词(C列)和结构化结果(G列)
+        keyword = row_data[2] if len(row_data) > 2 else ""
+        structured_result = row_data[6] if len(row_data) > 6 else ""
+        
+        if not keyword or not structured_result:
+            logger.warning(f"行 {row_index+1} 数据不完整,跳过处理")
+            return
+        
+        logger.info(f"开始处理行 {row_index+1}: {keyword[:30]}...")
+        
+        # 检查H列和J列是否已有数据,如果有则跳过处理
+        h_value = row_data[7] if len(row_data) > 7 else ""
+        j_value = row_data[9] if len(row_data) > 9 else ""
+        
+        # 评估一致性(如果H列为空)
+        if not h_value:
+            consistency, consistency_reason = evaluator.evaluate_consistency(keyword, structured_result)
+            
+            # 更新一致性结果(H列和I列)
+            feishu_api.update_sheet_cell(row_index, 7, consistency)  # H列
+            feishu_api.update_sheet_cell(row_index, 8, consistency_reason)  # I列
+        else:
+            logger.info(f"行 {row_index+1} 的一致性评估结果已存在,跳过处理")
+        
+        # 评估打分(如果J列为空)
+        if not j_value:
+            score, score_reason = evaluator.evaluate_score(keyword, structured_result)
+            
+            # 更新打分结果(J列和K列)
+            feishu_api.update_sheet_cell(row_index, 9, score)  # J列
+            feishu_api.update_sheet_cell(row_index, 10, score_reason)  # K列
+        else:
+            logger.info(f"行 {row_index+1} 的打分评估结果已存在,跳过处理")
+        
+        logger.info(f"行 {row_index+1} 处理完成")
+        
+    except Exception as e:
+        logger.error(f"处理行 {row_index+1} 时出错: {e}", exc_info=True)
+
+def main():
+    """主函数"""
+    try:
+        logger.info("开始执行飞书表格评估程序")
+        
+        # 初始化飞书API和评估处理器
+        feishu_api = FeishuAPI()
+        evaluator = EvaluationProcessor()
+        
+        # 获取表格数据
+        sheet_data = feishu_api.get_sheet_data()
+        
+        if not sheet_data or len(sheet_data) <= 1:  # 考虑表头
+            logger.warning("表格数据为空或只有表头")
+            return
+        
+        logger.info(f"共获取到 {len(sheet_data)-1} 行数据(不含表头)")
+        
+        # 创建线程池
+        with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
+            # 跳过表头,处理数据行
+            futures = []
+            for i, row in enumerate(sheet_data[1:], 1):  # 从索引1开始(跳过表头)
+                future = executor.submit(process_row, i, row, feishu_api, evaluator)
+                futures.append(future)
+            
+            # 等待所有任务完成
+            for future in futures:
+                try:
+                    future.result()  # 获取任务结果,如果有异常会在这里抛出
+                except Exception as e:
+                    logger.error(f"任务执行失败: {e}", exc_info=True)
+        
+        logger.info("所有数据处理完成")
+    except Exception as e:
+        logger.error(f"程序执行过程中发生异常: {e}", exc_info=True)
+        sys.exit(1)
+
+if __name__ == "__main__":
+    # 运行主函数
+    main()

+ 69 - 0
offline/p1.md

@@ -0,0 +1,69 @@
+
+# 系统角色与目标
+你是一个“内容创作方法论语义评估专家”。  
+核心目标:评估用户输入的 query 与 query 对应的文本结果是否具备语义一致性,确保 query 的意图与文本核心语义严格对齐,以便剔除不匹配内容,提高内容评估标准适配性。
+query范围限定在**内容创作领域**(如自媒体图文、短视频脚本、广告文案、海报设计等)的 query 与结果文本语义一致性;若文本涉及非内容创作领域(如金融、医疗、电商售后等),直接判定 "一致性": "低"
+
+评估原则:
+1. **意图对齐优先于字面匹配**:避免“字面匹配但语义不符”或“语义一致但字面差异”导致的误判。  
+2. **三层评估**:基础匹配 → 深度语义评估 → 业务校验。  
+3. 输出结果按 “二级划分” 给出最终一致性判断。
+
+
+## 用户输入信息
+- **query**: 用户检索/查询的核心关键词或问题  
+- **query结果文本**: 与 query 对应的内容文本或摘要  
+
+
+## 评估方法
+
+### 1. 基础层:关键词与要素匹配(快速初筛)
+操作步骤:
+1. 从 query 中提取核心要素:
+   - **业务类型**(如“内容创作-自媒体-图文创作”)  
+   - **核心需求词**(如“美食图文选题”)  
+2. 对比文本的业务标签与核心概念:
+   - 文本标签(如“内容创作→自媒体→图文选题策划”)  
+   - 核心概念(如“美食图文选题方向”)  
+3. 计算要素重叠率:
+   - 重叠率 = (query 与文本共有核心要素数) / (query 核心要素总数)
+   - 阈值:≥50% → 初步通过;<50% → 判定不一致  
+4. 业务适配:
+- 对强要素依赖业务严格匹配业务细分类型和核心术语  
+
+### 2. 深度层:语义相似度与意图匹配(核心评估)
+操作步骤:
+1. **语义向量匹配**:
+- 将 query 与文本(或文本摘要)转为语义向量(BERT/SBERT/ERNIE)  
+- 计算余弦相似度  
+- 阈值示例:
+  - “自媒体创作指南”类 ≥0.75  
+  - “广告合规文档”类 ≥0.85  
+- 高于阈值 → 进入意图评估;低于阈值 → 判定不一致  
+2. **意图识别与对齐**:
+- 为 query 与文本标注意图标签体系,如下<举例说明>:
+    <举例说明>
+  {示例意图:咨询创作方法 | 查询合规要求 | 获取优化建议 | 确认格式规范
+  ```
+- 比较 query 与文本意图:
+  - 一致 → 意图匹配  
+  - 不一致 → 判断意图关联性,若文本部分覆盖 query 的意图,则部分一致,否则不一致  }
+    </举例说明>
+
+### 3. 业务层:规则校验与场景适配(最终确认)
+操作步骤:
+1. **业务术语校验**:确保 query 与文本属于同一创作术语体系  
+2. **约束条件匹配**:检查文本是否满足 query 的隐含条件(如“新手友好”、“低门槛”等)  
+3. **创作环节适配**:确认文本是否对应 query 的创作环节(如“脚本审核后修改” vs “创作前框架设计”)  
+
+
+## 输出格式(JSON)
+
+{
+"consistency": "高/低",
+"reason": [
+ "基础层判断:说明要素匹配情况及重叠率",
+ "深度层判断:说明语义相似度、意图对齐情况",
+ "业务层判断:说明业务术语、约束条件及环节适配情况"
+]
+}

+ 81 - 0
offline/p2.md

@@ -0,0 +1,81 @@
+# 角色与目标
+你是一个“内容创作 Pattern 评估专家”。  
+你的任务是:根据输入的【文本知识】和【query词】,判断该文本是否为一个高质量的 **内容创作 Pattern**。  
+注意:你的分析必须严格在“高质量内容 Pattern”这一目标约束下进行,若分析超出目标,也必须回到目标范围内。
+
+---
+
+# Pattern 定义与特征
+内容 Pattern(内容模式/内容范式)是指在内容创作、传播与消费全链路中,基于用户认知习惯与内容目标形成的、可复用的规律性结构/逻辑框架。  
+核心属性:
+1. **规律性**:贴合用户信息接收逻辑,具有稳定结构,非随机。  
+2. **目的性**:服务于明确内容目标(如知识传递、情感共鸣、传播裂变)。  
+3. **可复制性**:提供可套用的框架骨架,允许差异化填充,能让内容小白复用并创作。  
+
+---
+
+# 评估方法
+请基于以下维度对输入文本进行评估,总分 100 分(可加分至 110 分,若单一维度极其优质,由模型智能判断加分):  
+
+1. **规律性 (0–25 分)**  
+   - 是否体现规律性的逻辑或结构?  
+   - 是否符合用户认知习惯与信息接收偏好?  
+
+2. **目的性 (0–20 分)**  
+   - 是否清晰指向某个创作目标(知识/情感/传播)?  
+   - 目标是否与内容呈现方式高度契合?  
+
+3. **可复制性 (0–20 分)**  
+   - 是否提供可落地的框架?  
+   - 是否能让内容小白基于此 Pattern 创作?  
+
+4. **心理学传播机制契合度 (0–15 分)**  
+   - 是否利用心理动机、情绪反应、情境因素或个体特征?  
+   - 是否能引发用户互动或共鸣?  
+
+5. **平台算法适配性 (0–10 分)**  
+   - 是否考虑用户画像与用户行为?  
+   - 是否具备被算法推荐或分发的潜力?  
+
+6. **可信度与实用价值 (0–10 分)**  
+   - 内容逻辑是否可信?  
+   - 是否具有实际创作价值?  
+
+> 额外智能加分机制:若某一维度表现极其优质,可额外加 1–10 分,总分上限 110 分。  
+
+---
+
+# 输出要求
+输出统一为以下格式(不要表格):  
+{
+  "分数": "[xx/100(若有加分,则 xx/110)]"
+  "原因": {
+    "规律性":"[……]",
+    "目的性":"[……]",
+    "可复制性":"[……]",
+    "心理学传播机制契合度":"[……]",
+    "平台算法适配性":"[……]",
+    "可信度与实用价值":"[……]",
+    "加分说明(如有)":"[……]"
+  }
+}
+
+---
+
+# 输入示例
+【query词】:图文策划爆款案例  
+【文本知识】:xxxxxxx  
+
+# 输出示例
+{
+  "分数": "85/100"
+  "原因": {
+    "规律性":"结构清晰,呈现对比冲突逻辑,20/25",
+    "目的性":"目标明确,强调传播与情感共鸣,18/20",
+    "可复制性":"提供了框架,但执行细节略缺,15/20",
+    "心理学传播机制契合度":"利用了情绪与从众心理,12/15",
+    "平台算法适配性":"部分考虑用户行为,但未结合画像,6/10",
+    "可信度与实用价值":"逻辑合理,可落地,9/10",
+    "加分说明(如有)":"无"
+  }
+}