import json import requests from openpyxl.styles.builtins import title from core.config import logger from core.database_data import DatabaseHelper def add_data(text, dataset_id, title=None): try: response = requests.post( url='http://61.48.133.26:8001/api/chunk', json={ "text": text, "dataset_id": dataset_id}, headers={"Content-Type": "application/json"}, ) return response.json()['doc_id'] except Exception as e: logger.error(e) def is_empty(value): """辅助函数:判断值是否为空(None 或空字符串)""" return value is None or value == "" def parse_json(file_path): text_list = [] try: # 读取文件内容 with open(file_path, 'r', encoding='utf-8') as file: try: # 解析JSON内容 json_data = json.load(file) # 检查是否为JSON数组 if isinstance(json_data, list): # 遍历每个JSON对象 for index, json_obj in enumerate(json_data, 1): body_text = json_obj.get("body_text", "") title = json_obj.get("title", "") if not is_empty(body_text): text_list.append({'body_text': body_text, 'title': title}) else: print("错误: 文件内容不是一个JSON数组") except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") except FileNotFoundError: print(f"错误: 找不到文件 '{file_path}'") except Exception as e: print(f"发生错误: {e}") return text_list def select_data(): db_helper = DatabaseHelper() # 执行查询 query = """ SELECT c.crawl_data as json_text FROM knowledge_extraction_content a LEFT JOIN knowledge_parsing_content b ON a.parsing_id = b.id AND b.request_id = a.request_id LEFT JOIN knowledge_crawl_content c ON c.content_id = b.content_id AND c.request_id = a.request_id LEFT JOIN knowledge_request d ON d.request_id = a.request_id LEFT JOIN knowledge_query e ON e.id = d.query_id WHERE a.request_id > '20250905022700393495252' AND e.knowledge_type = '整体' AND a.score >= 0 AND e.category_id = 0 ORDER BY a.id DESC """ result = db_helper.execute_query(query) for row in result: add_data(json.loads(row['json_text'])['body_text']) if __name__ == '__main__': json_path = '../data/test_data1.json' text_list = parse_json(json_path) re = [] for text in text_list: res = add_data(text['body_text']) if res is None: re.append(text) re1 = [] for text in re: res = add_data(text['body_text']) if res is None: re.append(text) print(json.dumps(re1, ensure_ascii=False))