123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- import concurrent
- import json
- import threading
- from core.database import DBHelper
- from data_models.content_data import ContentData
- from data_models.keyword_clustering import KeywordClustering
- from data_models.keyword_data import KeywordData
- from data_models.keyword_with_content import KeywordWithContent
- from utils.deepseek_utils import text_segment, text_question, create_keyword_summary_prompt, get_keyword_summary, \
- update_keyword_summary_prompt
- def is_empty(value):
- """辅助函数:判断值是否为空(None 或空字符串)"""
- return value is None or value == ""
- def parse_json(file_path):
- try:
- # 读取文件内容
- with open(file_path, 'r', encoding='utf-8') as file:
- try:
- # 解析JSON内容
- json_data = json.load(file)
- # 检查是否为JSON数组
- if isinstance(json_data, list):
- return json_data
- # # 遍历每个JSON对象
- # for index, json_obj in enumerate(json_data, 1):
- # body_text = json_obj.get("body_text", "") # 字段不存在 → 空字符串
- # if not is_empty(body_text):
- # text_list.append(body_text)
- else:
- print("错误: 文件内容不是一个JSON数组")
- except json.JSONDecodeError as e:
- print(f"JSON解析错误: {e}")
- except FileNotFoundError:
- print(f"错误: 找不到文件 '{file_path}'")
- except Exception as e:
- print(f"发生错误: {e}")
- return []
- def generate_keywords(keyword, content_data):
- db_helper = DBHelper()
- keyword_data = db_helper.get(KeywordData, keyword=keyword)
- if keyword_data is None:
- new_keyword_data = KeywordData(keyword=keyword)
- keyword_data = db_helper.add(new_keyword_data)
- keyword_with_content = KeywordWithContent(keyword_id=keyword_data.id, content_id=content_data.id)
- db_helper.add(keyword_with_content)
- keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
- if keyword_clustering is None:
- keyword_summary = get_keyword_summary(content_data.content, keyword_data.keyword)
- new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
- keyword_summary=keyword_summary['keyword_summary'])
- db_helper.add(new_keyword_clustering)
- print(new_keyword_clustering)
- else:
- new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
- content_data.content)
- db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
- updates={"keyword_summary": new_keyword_summary})
- # 划分7元组
- def ai_dispose(text):
- db_helper = DBHelper()
- segments = text_segment(text)['segments']
- segment_pre_content_id = None
- for segment in segments:
- text = ''
- content = segment['content']
- summary = segment['summary']
- keywords = segment['keywords']
- if not is_empty(content) and not is_empty(summary):
- # 两个都不为空:拼接(中间加空格,可按需改为空字符串 "")
- text = f"{content}。{summary}"
- elif not is_empty(summary):
- # 仅 title 不为空:返回 title
- text = content
- elif not is_empty(content):
- # 仅 body_text 不为空:返回 body_text
- text = summary
- questions = text_question(text)['questions']
- content_data = ContentData(pre_content_id=segment_pre_content_id,
- content=content,
- summary=summary,
- keywords=json.dumps(keywords, ensure_ascii=False),
- entities=json.dumps(segment['entities'], ensure_ascii=False),
- questions=json.dumps(questions, ensure_ascii=False),
- keywords_status=0)
- db_helper.add(content_data)
- segment_pre_content_id = content_data.id
- for keyword in keywords:
- generate_keywords(keyword, content_data)
- db_helper.update(ContentData, filters={"id": content_data.id}, updates={"keywords_status": 1})
- print_lock = threading.Lock()
- def process_text(text):
- """处理单个文本的函数"""
- try:
- # 使用锁确保打印不会交叉
- with print_lock:
- print(f"处理文本: {text[:50]}...") # 只打印前50个字符避免过长
- # 调用原来的处理函数
- result = ai_dispose(text)
- return result
- except Exception as e:
- with print_lock:
- print(f"处理文本时出错: {e}")
- return None
- # 使用线程池处理文本列表
- def process_texts_concurrently(text_list, max_workers=20):
- """使用多线程并发处理文本列表"""
- results = []
- # 创建线程池执行器
- with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务到线程池
- future_to_text = {executor.submit(process_text, text['body_text']): text for text in text_list}
- # 处理完成的任务
- for future in concurrent.futures.as_completed(future_to_text):
- text = future_to_text[future]
- try:
- result = future.result()
- results.append(result)
- with print_lock:
- print(f"成功处理文本: {text[:30]}...")
- except Exception as e:
- with print_lock:
- print(f"处理文本时发生异常: {e}")
- return results
- if __name__ == '__main__':
- json_path = '../data/test_data1.json'
- text_list = parse_json(json_path)
- process_texts_concurrently(text_list)
|