jihuaqiang 2 days ago
parent
commit
9431717b65
2 changed files with 344 additions and 17 deletions
  1. 29 17
      agent.py
  2. 315 0
      tools/move_data.py

+ 29 - 17
agent.py

@@ -389,28 +389,40 @@ def create_langgraph_workflow():
                     task_id = item.get('task_id') or ''
 
                     # 先在库中查询是否已经处理过
-                    check_sql = "SELECT id,status FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s AND status = 5"
+                    check_sql = "SELECT id,status,indentify_data FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
                     check_result = MysqlHelper.get_values(check_sql, (state["request_id"], content_id))
+                    result_status = 0
+                    result_id = 0
+                    result_indentify_data = {}
                     if check_result:
-                        id, status = check_result[0]
+                        id, status, indentify_data = check_result[0]
+                        logger.info(f"查询到待结构化处理的条目,id: {id}, status: {status}, indentify_data: {indentify_data}")
+                        result_status = status
+                        result_id = id
+                        result_indentify_data = indentify_data
                         if status == 5:
                             success_count += 1
                             continue
-                    
-                    # Step 1: 识别
-                    identify_result = identify_tool.run(
-                        crawl_data if isinstance(crawl_data, dict) else {}
-                    )
-                    
-                    # Step 2: 结构化并入库
-                    affected = UpdateDataTool.store_indentify_result(
-                        state["request_id"], 
-                        {
-                            "content_id": content_id,
-                            "task_id": task_id
-                        }, 
-                        identify_result
-                    )
+
+                    # result_status == 0 表示为处理过,需要进行识别和结构化
+                    if result_status == 0 or result_status == 3:
+                        # Step 1: 识别
+                        identify_result = identify_tool.run(
+                            crawl_data if isinstance(crawl_data, dict) else {}
+                        )
+                        
+                        # Step 2: 结构化并入库
+                        affected = UpdateDataTool.store_indentify_result(
+                            state["request_id"], 
+                            {
+                                "content_id": content_id,
+                                "task_id": task_id
+                            }, 
+                            identify_result
+                        )
+                    else:
+                        identify_result = result_indentify_data
+                        affected = result_id
                     
                     # 使用StructureTool进行内容结构化处理
                     structure_tool = StructureTool()

+ 315 - 0
tools/move_data.py

@@ -0,0 +1,315 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+move_data脚本主要做以下的事情
+1. 读取knowledge_search_content中request_id IS NOT NULL 并且 is_move = 0 的数据,将返回的多条数据每一条都执行以下步骤
+提取出request_id,source_channel,query_word,formatted_content, multimodal_recognition,structured_data字段
+2. 根据提取出的字段,写入到不同的表中
+2.1 如果formatted_content不为空,则新增一条新纪录到knowledge_crawl_content表中,新增时字段取值如下
+ content_id字段取自formatted_content解析为json里面的content_id字段;
+ channel字段取自source_channel字段;
+ request_id字段取自request_id字段;
+ crawl_data字段取自formatted_content
+2.2 如果multimodal_recognition不为空,且structured_data为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下
+content_id字段取自formatted_content解析为json里面的content_id字段;
+request_id字段取自request_id字段;
+indentify_data字段取自multimodal_recognition
+status设置为2
+2.3 如果multimodal_recognition不为空,且structured_data不为空,则新增一条记录到knowledge_parsing_content表中,新增时字段取值如下
+content_id字段取自formatted_content解析为json里面的content_id字段;
+request_id字段取自request_id字段;
+indentify_data字段取自multimodal_recognition
+parsing_data字段取自structured_data
+status设置为5
+
+另外如果本次查询所有的数据都满足2.3条件,则将knowledge_request表中request_id为request_id的parsing_status字段设置为2,否则设置为0
+每条数据执行完毕后都更新knowledge_search_content表中is_move字段为1,标记为已处理
+"""
+
+import json
+import sys
+import os
+import argparse
+from loguru import logger
+
+# 添加项目根目录到Python路径
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.mysql_db import MysqlHelper
+
+
+class MoveDataProcessor:
+    """数据迁移处理器"""
+    
+    def __init__(self):
+        self.logger = logger
+        
+    def get_unprocessed_request_ids(self):
+        """获取未处理的request_id列表"""
+        sql = """
+        SELECT DISTINCT request_id
+        FROM knowledge_search_content 
+        WHERE request_id IS NOT NULL AND is_move = 0
+        ORDER BY request_id ASC
+        LIMIT 1
+        """
+        
+        try:
+            result = MysqlHelper.get_values(sql)
+            if result:
+                return result[0][0]  # 返回第一个request_id
+            return None
+        except Exception as e:
+            self.logger.error(f"查询未处理request_id失败: {e}")
+            return None
+    
+    def get_data_by_request_id(self, request_id):
+        """根据request_id获取数据"""
+        sql = """
+        SELECT id, request_id, source_channel, query_word, formatted_content, 
+               multimodal_recognition, structured_data
+        FROM knowledge_search_content 
+        WHERE request_id = %s AND is_move = 0
+        ORDER BY id ASC
+        """
+        
+        try:
+            result = MysqlHelper.get_values(sql, (request_id,))
+            if result:
+                # 将元组转换为字典列表
+                columns = ['id', 'request_id', 'source_channel', 'query_word', 
+                          'formatted_content', 'multimodal_recognition', 'structured_data']
+                data_list = []
+                for row in result:
+                    data_dict = dict(zip(columns, row))
+                    data_list.append(data_dict)
+                return data_list
+            return []
+        except Exception as e:
+            self.logger.error(f"查询request_id数据失败: {e}")
+            return []
+    
+    def extract_content_id(self, formatted_content):
+        """从formatted_content中提取content_id"""
+        try:
+            if not formatted_content:
+                return None
+            
+            # 解析JSON
+            content_data = json.loads(formatted_content)
+            return content_data.get('channel_content_id')
+        except (json.JSONDecodeError, AttributeError) as e:
+            self.logger.error(f"解析formatted_content失败: {e}")
+            return None
+    
+    def insert_crawl_content(self, content_id, source_channel, request_id, formatted_content):
+        """插入数据到knowledge_crawl_content表"""
+        sql = """
+        INSERT INTO knowledge_crawl_content 
+        (content_id, channel, request_id, crawl_data, create_time)
+        VALUES (%s, %s, %s, %s, NOW())
+        """
+        
+        params = (content_id, source_channel, request_id, formatted_content)
+        
+        try:
+            result = MysqlHelper.insert_and_get_id(sql, params)
+            if result:
+                self.logger.info(f"插入crawl_content成功: content_id={content_id}, request_id={request_id}, insert_id={result}")
+                return result
+            return None
+        except Exception as e:
+            self.logger.error(f"插入crawl_content失败: {e}")
+            return None
+    
+    def insert_parsing_content(self, content_id, request_id, multimodal_recognition, structured_data=None):
+        """插入数据到knowledge_parsing_content表"""
+        # 根据是否有structured_data确定status
+        status = 5 if structured_data else 2
+        
+        sql = """
+        INSERT INTO knowledge_parsing_content 
+        (content_id, request_id, task_id, indentify_data, parsing_data, create_time, status)
+        VALUES (%s, %s, %s, %s, %s, NOW(), %s)
+        """
+        
+        params = (content_id, request_id, 1, multimodal_recognition, structured_data, status)
+        
+        try:
+            result = MysqlHelper.insert_and_get_id(sql, params)
+            if result:
+                self.logger.info(f"插入parsing_content成功: content_id={content_id}, request_id={request_id}, status={status}, insert_id={result}")
+                return result
+            return None
+        except Exception as e:
+            self.logger.error(f"插入parsing_content失败: {e}")
+            return None
+    
+    def update_request_status(self, request_id, status):
+        """更新knowledge_request表的parsing_status"""
+        sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s"
+        params = (status, request_id)
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}")
+                return True
+            return False
+        except Exception as e:
+            self.logger.error(f"更新request状态失败: {e}")
+            return False
+    
+    def mark_as_processed(self, record_id):
+        """标记数据为已处理"""
+        sql = "UPDATE knowledge_search_content SET is_move = 1 WHERE id = %s"
+        params = (record_id,)
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"标记数据已处理成功: id={record_id}")
+                return True
+            return False
+        except Exception as e:
+            self.logger.error(f"标记数据已处理失败: {e}")
+            return False
+    
+    def process_single_record(self, record):
+        """处理单条记录"""
+        record_id = record['id']
+        request_id = record['request_id']
+        source_channel = record['source_channel']
+        formatted_content = record['formatted_content']
+        multimodal_recognition = record['multimodal_recognition']
+        structured_data = record['structured_data']
+        
+        self.logger.info(f"开始处理记录: id={record_id}, request_id={request_id}")
+        
+        # 提取content_id
+        content_id = self.extract_content_id(formatted_content)
+        if not content_id:
+            self.logger.warning(f"无法提取content_id: id={record_id}")
+            # 仍然标记为已处理,避免重复处理
+            self.mark_as_processed(record_id)
+            return False
+        
+        # 处理formatted_content
+        if formatted_content:
+            self.insert_crawl_content(content_id, source_channel, request_id, formatted_content)
+        
+        # 处理multimodal_recognition
+        parsing_inserted = False
+        if multimodal_recognition:
+            if structured_data:
+                # 情况2.3: multimodal_recognition不为空且structured_data不为空
+                self.insert_parsing_content(content_id, request_id, multimodal_recognition, structured_data)
+                parsing_inserted = True
+            else:
+                # 情况2.2: multimodal_recognition不为空但structured_data为空
+                self.insert_parsing_content(content_id, request_id, multimodal_recognition)
+                parsing_inserted = True
+        
+        # 标记为已处理
+        self.mark_as_processed(record_id)
+        
+        return parsing_inserted
+    
+    def process_single_request_id(self, request_id=None):
+        """处理单个request_id的数据"""
+        # 如果没有指定request_id,则获取第一个未处理的request_id
+        if request_id is None:
+            request_id = self.get_unprocessed_request_ids()
+            if not request_id:
+                self.logger.info("没有需要处理的request_id")
+                return False
+        
+        self.logger.info(f"开始处理request_id: {request_id}")
+        
+        # 获取该request_id的所有数据
+        records = self.get_data_by_request_id(request_id)
+        
+        if not records:
+            self.logger.info(f"request_id={request_id} 没有未处理的数据")
+            return False
+        
+        self.logger.info(f"request_id={request_id} 找到 {len(records)} 条记录")
+        
+        all_satisfy_condition_23 = True
+        
+        # 处理该request_id的所有记录
+        for record in records:
+            parsing_inserted = self.process_single_record(record)
+            
+            # 检查是否满足条件2.3(multimodal_recognition不为空且structured_data不为空)
+            multimodal_recognition = record['multimodal_recognition']
+            structured_data = record['structured_data']
+            
+            if not (multimodal_recognition and structured_data):
+                all_satisfy_condition_23 = False
+        
+        # 更新knowledge_request表的parsing_status
+        if all_satisfy_condition_23:
+            self.update_request_status(request_id, 2)
+            self.logger.info(f"request_id={request_id} 所有数据都满足条件2.3,设置parsing_status=2")
+        else:
+            self.update_request_status(request_id, 0)
+            self.logger.info(f"request_id={request_id} 部分数据不满足条件2.3,设置parsing_status=0")
+        
+        return True
+    
+    def process_all_data(self):
+        """处理所有数据(循环处理每个request_id)"""
+        processed_count = 0
+        
+        while True:
+            # 获取下一个未处理的request_id
+            request_id = self.get_unprocessed_request_ids()
+            if not request_id:
+                self.logger.info("所有数据已处理完成")
+                break
+            
+            # 处理该request_id
+            success = self.process_single_request_id(request_id)
+            if success:
+                processed_count += 1
+                self.logger.info(f"已处理 {processed_count} 个request_id")
+            else:
+                self.logger.warning(f"处理request_id={request_id} 失败")
+                break
+        
+        self.logger.info(f"总共处理了 {processed_count} 个request_id")
+
+
+def main():
+    """主函数"""
+    parser = argparse.ArgumentParser(description='数据迁移脚本')
+    parser.add_argument('--request-id', type=str, help='指定要处理的request_id,如果不指定则处理所有未处理的数据')
+    parser.add_argument('--single', action='store_true', help='只处理一个request_id(第一个未处理的)')
+    
+    args = parser.parse_args()
+    
+    processor = MoveDataProcessor()
+    
+    if args.request_id:
+        # 处理指定的request_id
+        success = processor.process_single_request_id(args.request_id)
+        if success:
+            logger.info(f"成功处理request_id: {args.request_id}")
+        else:
+            logger.error(f"处理request_id失败: {args.request_id}")
+            sys.exit(1)
+    elif args.single:
+        # 只处理一个request_id
+        success = processor.process_single_request_id()
+        if success:
+            logger.info("成功处理一个request_id")
+        else:
+            logger.info("没有需要处理的request_id")
+    else:
+        # 处理所有数据
+        processor.process_all_data()
+
+
+if __name__ == "__main__":
+    main()