| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import json
- import requests
- from openpyxl.styles.builtins import title
- from core.config import logger
- from core.database_data import DatabaseHelper
- def add_data(text, dataset_id, title=None):
- try:
- response = requests.post(
- url='http://61.48.133.26:8001/api/chunk',
- json={
- "text": text,
- "dataset_id": dataset_id},
- headers={"Content-Type": "application/json"},
- )
- return response.json()['doc_id']
- except Exception as e:
- logger.error(e)
- def is_empty(value):
- """辅助函数:判断值是否为空(None 或空字符串)"""
- return value is None or value == ""
- def parse_json(file_path):
- text_list = []
- try:
- # 读取文件内容
- with open(file_path, 'r', encoding='utf-8') as file:
- try:
- # 解析JSON内容
- json_data = json.load(file)
- # 检查是否为JSON数组
- if isinstance(json_data, list):
- # 遍历每个JSON对象
- for index, json_obj in enumerate(json_data, 1):
- body_text = json_obj.get("body_text", "")
- title = json_obj.get("title", "")
- if not is_empty(body_text):
- text_list.append({'body_text': body_text, 'title': title})
- else:
- print("错误: 文件内容不是一个JSON数组")
- except json.JSONDecodeError as e:
- print(f"JSON解析错误: {e}")
- except FileNotFoundError:
- print(f"错误: 找不到文件 '{file_path}'")
- except Exception as e:
- print(f"发生错误: {e}")
- return text_list
- def select_data():
- db_helper = DatabaseHelper()
- # 执行查询
- query = """
- SELECT c.crawl_data as json_text
- FROM knowledge_extraction_content a
- LEFT JOIN knowledge_parsing_content b ON a.parsing_id = b.id AND b.request_id = a.request_id
- LEFT JOIN knowledge_crawl_content c ON c.content_id = b.content_id AND c.request_id = a.request_id
- LEFT JOIN knowledge_request d ON d.request_id = a.request_id
- LEFT JOIN knowledge_query e ON e.id = d.query_id
- WHERE a.request_id > '20250905022700393495252' AND e.knowledge_type = '整体' AND a.score >= 0 AND e.category_id = 0
- ORDER BY a.id DESC
- """
- result = db_helper.execute_query(query)
- for row in result:
- add_data(json.loads(row['json_text'])['body_text'])
- if __name__ == '__main__':
- json_path = '../data/test_data1.json'
- text_list = parse_json(json_path)
- re = []
- for text in text_list:
- res = add_data(text['body_text'])
- if res is None:
- re.append(text)
- re1 = []
- for text in re:
- res = add_data(text['body_text'])
- if res is None:
- re.append(text)
- print(json.dumps(re1, ensure_ascii=False))
|