浏览代码

重启时处理重复数据

jihuaqiang 3 天之前
父节点
当前提交
71810a4135
共有 2 个文件被更改,包括 45 次插入22 次删除
  1. 1 1
      agent.py
  2. 44 21
      tools/agent_tools.py

+ 1 - 1
agent.py

@@ -389,7 +389,7 @@ def create_langgraph_workflow():
                     task_id = item.get('task_id') or ''
 
                     # 先在库中查询是否已经处理过
-                    check_sql = "SELECT id,status FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
+                    check_sql = "SELECT id,status FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s AND status = 5"
                     check_result = MysqlHelper.get_values(check_sql, (state["request_id"], content_id))
                     if check_result:
                         id, status = check_result[0]

+ 44 - 21
tools/agent_tools.py

@@ -259,28 +259,51 @@ class UpdateDataTool:
             content_id = crawl_raw.get('content_id') or ''
             task_id = crawl_raw.get('task_id') or ''  # 默认任务ID,可根据需要调整
             
-            # 构建存储数据
-            
-            sql = (
-                "INSERT INTO knowledge_parsing_content "
-                "(content_id, request_id, task_id, indentify_data, create_time, status) "
-                "VALUES (%s, %s, %s, %s, NOW(), %s)"
-            )
-            
-            # 状态:2 表示处理完成
+            # 先查询是否存在相同 request_id + content_id 的记录
+            check_sql = "SELECT id, status FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s LIMIT 1"
+            exists = MysqlHelper.get_values(check_sql, (request_id, content_id))
+
+            # 状态:2 表示识别处理完成
             status = 2
-            params = (
-                content_id, 
-                request_id, 
-                task_id, 
-                json.dumps(identify_result, ensure_ascii=False),
-                status
-            )
-            
-            result = MysqlHelper.insert_and_get_id(sql, params)
-            if result:
-                logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}, insert_id={result}")
-            return result
+            serialized_identify = json.dumps(identify_result, ensure_ascii=False)
+
+            if exists:
+                # 已存在则更新,不新建
+                existing_id = exists[0][0] if isinstance(exists, list) and len(exists) > 0 else None
+                update_sql = (
+                    "UPDATE knowledge_parsing_content "
+                    "SET indentify_data = %s, task_id = %s, status = %s "
+                    "WHERE request_id = %s AND content_id = %s"
+                )
+                update_params = (
+                    serialized_identify,
+                    task_id,
+                    status,
+                    request_id,
+                    content_id
+                )
+                updated = MysqlHelper.update_values(update_sql, update_params)
+                if updated is not None:
+                    logger.info(f"更新识别结果成功: request_id={request_id}, content_id={content_id}, id={existing_id}")
+                return existing_id
+            else:
+                # 不存在则插入
+                insert_sql = (
+                    "INSERT INTO knowledge_parsing_content "
+                    "(content_id, request_id, task_id, indentify_data, create_time, status) "
+                    "VALUES (%s, %s, %s, %s, NOW(), %s)"
+                )
+                insert_params = (
+                    content_id,
+                    request_id,
+                    task_id,
+                    serialized_identify,
+                    status
+                )
+                result = MysqlHelper.insert_and_get_id(insert_sql, insert_params)
+                if result:
+                    logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}, insert_id={result}")
+                return result
             
         except Exception as e:
             logger.error(f"存储解析结果失败: request_id={request_id}, error={e}")