Browse Source

clean_agent

丁云鹏 1 week ago
parent
commit
bfed5f5b68
4 changed files with 23 additions and 71 deletions
  1. 1 1
      agents/clean_agent/agent.py
  2. 15 34
      agents/clean_agent/tools.py
  3. 1 7
      gemini.py
  4. 6 29
      prompt/extraction.md

+ 1 - 1
agents/clean_agent/agent.py

@@ -129,7 +129,7 @@ def main():
         os.environ["HTTPS_PROXY"] = proxy_url
         os.environ["HTTP_PROXY"] = proxy_url
     # 执行Agent
-    result = execute_agent_with_api('{"query_word":"账号做内容的方法","request_id":"REQUEST_001"}')
+    result = execute_agent_with_api('{"query_word":"图文策划方法","request_id":"REQUEST_001"}')
     print(result)
 
 if __name__ == '__main__':

+ 15 - 34
agents/clean_agent/tools.py

@@ -20,7 +20,7 @@ logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
 # 配置常量
-BATCH_SIZE = 10  # 分批处理大小
+BATCH_SIZE = 5  # 分批处理大小
 SCORE_THRESHOLD = 70  # 评分阈值
 
 # Define tools
@@ -105,25 +105,15 @@ def execute_continuous_evaluation_extraction(request_id: str, db: Session, query
     # 这里的代码永远不会被执行到,因为在while循环中,当contents为空时会返回
 
 def get_batch_contents_for_evaluation(request_id: str, db: Session, batch_size: int, offset: int = 0) -> list:
-    """分批获取待评估的内容
-    
-    Args:
-        request_id: 请求ID
-        db: 数据库会话
-        batch_size: 批量大小
-        offset: 偏移量,用于分页
-        
-    Returns:
-        待评估内容列表
-    """
-    query = db.query(KnowledgeParsingContent).filter(
-        KnowledgeParsingContent.status == 2  # 已完成提取的数据
+    query = db.query(KnowledgeParsingContent).outerjoin(
+        KnowledgeExtractionContent,
+        KnowledgeParsingContent.id == KnowledgeExtractionContent.parsing_id
+    ).filter(
+        KnowledgeParsingContent.status == 2,  # 已完成提取的数据
+        KnowledgeParsingContent.request_id == request_id,
+        KnowledgeExtractionContent.parsing_id == None
     )
     
-    # 如果指定了request_id,则只处理该request_id的数据
-    if request_id:
-        query = query.filter(KnowledgeParsingContent.request_id == request_id)
-    
     return query.offset(offset).limit(batch_size).all()
 
 def batch_evaluate_content(contents: list, db: Session, request_id: str, query_word: str) -> list:
@@ -246,22 +236,16 @@ def batch_call_llm_for_evaluation(contents: list, query_word: str) -> list:
     try:
         # 批量调用 Gemini 进行评估
         results = gemini_processor.batch_process(evaluation_contents, EVALUATION_PROMPT)
-       
+        
         # 处理返回结果
         evaluation_results = []
         for i, result in enumerate(results):
             result = re.sub(r'^\s*```json|\s*```\s*$', '', result, flags=re.MULTILINE).strip()
+            result = json.loads(result)
             parsing_id = contents[i].id
-            parsing_data = contents[i].parsing_data
-            
-            if isinstance(result, dict) and "score" in result:
-                # 正常结果
-                score = result.get("score", -2)
-                reason = result.get("reason", "")
-            else:
-                # 异常结果
-                score = -2
-                reason = "评估失败"
+            parsing_data = contents[i].parsing_data   
+            score = result.get("score", -2)
+            reason = result.get("reason", "")
             
             evaluation_results.append((parsing_id, score, reason, parsing_data))
         
@@ -282,6 +266,7 @@ def batch_call_llm_for_extraction(evaluation_results: list, query_word: str) ->
             "content": parsing_data
         })
     
+    logger.info(f"批量抽取内容: {extraction_contents}")
     try:
         # 批量调用 Gemini 进行抽取
         results = gemini_processor.batch_process(extraction_contents, EXTRACTION_PROMPT)
@@ -290,11 +275,7 @@ def batch_call_llm_for_extraction(evaluation_results: list, query_word: str) ->
         extraction_results = []
         for i, result in enumerate(results):
             result = re.sub(r'^\s*```json|\s*```\s*$', '', result, flags=re.MULTILINE).strip()
-            # 确保结果包含必要的字段
-            if not isinstance(result, dict):
-                result = {"extracted_data": str(result)}
-
-            extraction_results.append(json.dumps(result, ensure_ascii=False))
+            extraction_results.append(result)
         
         return extraction_results
         

+ 1 - 7
gemini.py

@@ -50,13 +50,7 @@ class GeminiProcessor:
             )
             
             # 尝试解析 JSON 响应
-            try:
-                return response.text;
-                result = json.loads(response.text)
-                return result
-            except json.JSONDecodeError:
-                # 如果不是 JSON 格式,返回原始文本
-                return {"result": response.text, "raw_response": response.text}
+            return response.text
                 
         except Exception as e:
             print(f"Gemini API 调用失败: {e}")

+ 6 - 29
prompt/extraction.md

@@ -14,31 +14,9 @@
 1.  **query_word:** 对应本次任务的“查询意图”,即一个清晰、具体的搜索词或问题。
 2.  **content:** 对应本次任务的“原始数据”,即一段未经处理的Markdown格式文本。
 
-JSON输入结构示例:
-```json
-{
-    "query_word": "你的查询词",
-    "content": "你的Markdown格式原始数据"
-}
-```
-
 ## 输出要求 (Output)
-你需要返回一个JSON对象。
-1.  **格式 (Format):** JSON对象应包含一个键`"extracted_data"`,其值为清洗后、且与“查询意图”直接相关的纯文本内容。提取出的原文片段应拼接在一起,形成一个连续的文本流,段落之间保留自然换行。
-2.  **边界情况处理 (Edge Case):** 如果在原始数据中经过清洗和筛选后,未找到任何与“查询意图”直接相关的内容,则`"extracted_data"`键的值必须是字符串`"未找到相关信息"`。
-
-JSON输出结构示例:
-```json
-{
-    "extracted_data": "清洗后且与Query意图相关的纯文本内容"
-}
-```
-或在未找到内容时:
-```json
-{
-    "extracted_data": "未找到相关信息"
-}
-```
+1.  **格式 (Format):** 你需要返回一个纯文本内容。所有提取出的相关原文片段应拼接在一起,形成一个连续的文本流,段落之间保留自然换行。
+2.  **边界情况处理 (Edge Case):** 如果在原始数据中经过清洗和筛选后,未找到任何与“查询意图”直接相关的内容,你必须明确输出以下提示信息:“未找到相关信息”。
 
 ## 约束条件 (Constraints)
 1.  **内容完整性:** 你不能对原文内容进行任何总结、改写、简化或添加额外信息。所有提取的都必须是原始文本的精确片段。
@@ -68,10 +46,9 @@ JSON输出结构示例:
 
 ## 工作流程 (Workflow)
 请严格按照以下步骤完成任务:
-1.  **分析Query:** 首先,从输入的JSON对象中提取`query_word`,并仔细分析和透彻理解其含义,明确本次任务需要提取的核心信息和主题方向。
-2.  **预处理数据:** 接着,从输入的JSON对象中提取`content`(原始Markdown文本),并逐行或逐段地通读。严格按照上述“约束条件”中定义的所有“数据清洗规则”,对文本进行预处理。在这一步,只关注格式和结构性噪音的去除,不进行相关性判断。
+1.  **分析Query:** 首先,仔细分析并透彻理解用户提供的“查询意图”,明确本次任务需要提取的核心信息和主题方向。
+2.  **预处理数据:** 接着,逐行或逐段地通读“原始数据”Markdown文本,并严格按照上述“约束条件”中定义的所有“数据清洗规则”,对文本进行预处理。在这一步,只关注格式和结构性噪音的去除,不进行相关性判断。
 3.  **判断相关性:** 然后,在清洗后的文本中,逐句(或根据上下文判断,逐小段)地分析每个文本单元。判断该文本单元是否与第一步中理解的“查询意图”直接相关。
 4.  **整合输出:**
-    *   将所有判断为“直接相关”的、未经任何修改的原文片段,按照其在原始文本中的先后顺序,拼接成一个纯文本流。
-    *   将此纯文本流作为`"extracted_data"`键的值,构建最终的JSON输出。
-    *   如果经过以上步骤,最终没有找到任何直接相关的原文片段,则构建JSON输出为`{"extracted_data": "未找到相关信息"}`。
+    *   将所有判断为“直接相关”的、未经任何修改的原文片段,按照其在原始文本中的先后顺序,拼接成一个纯文本流,作为最终输出。段落间保留自然换行。
+    *   如果经过以上步骤,最终没有找到任何直接相关的原文片段,则输出“未找到相关信息”。