| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- import concurrent
- import json
- from concurrent.futures import ThreadPoolExecutor
- from time import sleep
- from venv import logger
- from core.database import DBHelper
- from data_models.content_chunks import ContentChunks
- from data_models.keyword_clustering import KeywordClustering
- from data_models.keyword_data import KeywordData
- from data_models.keyword_with_content_chunk import KeywordWithContentChunk
- from utils.deepseek_utils import get_keyword_summary, update_keyword_summary_prompt
- class KeywordSummaryTask:
- def __init__(self):
- self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
- def _generate_keywords(self, content_chunk):
- db_helper = DBHelper()
- keywords = json.loads(content_chunk.keywords)
- for keyword in keywords:
- keyword_data = db_helper.get(KeywordData, keyword=keyword)
- if keyword_data is None:
- new_keyword_data = KeywordData(keyword=keyword)
- keyword_data = db_helper.add(new_keyword_data)
- keyword_with_content_chunk = db_helper.get(KeywordWithContentChunk, keyword_id=keyword_data.id,
- content_chunk_id=content_chunk.id)
- if keyword_with_content_chunk is None:
- keyword_with_content_chunk = KeywordWithContentChunk(keyword_id=keyword_data.id,
- content_chunk_id=content_chunk.id)
- db_helper.add(keyword_with_content_chunk)
- if keyword_with_content_chunk.keyword_clustering_status == 0:
- try:
- keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
- if keyword_clustering is None:
- keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
- new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
- keyword_summary=keyword_summary['keyword_summary'])
- db_helper.add(new_keyword_clustering)
- else:
- new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
- content_chunk.text)
- db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
- updates={"keyword_summary": new_keyword_summary})
- db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
- updates={"keyword_clustering_status": 1})
- except Exception as e:
- print(e)
- db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
- updates={"keyword_clustering_status": 2})
- db_helper.update(ContentChunks, filters={"id": content_chunk.id},
- updates={"keywords_status": 1})
- # 使用线程池处理文本列表
- def process_texts_concurrently(self):
- print('process_texts_concurrently start')
- db_helper = DBHelper()
- while True:
- content_chunks = db_helper.get_all(ContentChunks, chunk_status=2, keywords_status=0)
- if len(content_chunks) == 0:
- logger.info('sleep')
- print('sleep')
- sleep(1800)
- else:
- future_to_chunk = {self.executor.submit(self._generate_keywords, content_chunk): content_chunk for
- content_chunk
- in
- content_chunks}
- # 等待所有任务完成
- concurrent.futures.wait(future_to_chunk.keys())
- # 创建一个字典,内容块到结果的映射(注意:这里假设任务没有异常,如果有异常,result()会抛出)
- results = {}
- for future, chunk in future_to_chunk.items():
- try:
- results[chunk] = future.result()
- except Exception as exc:
- results[chunk] = exc # 或者你可以选择其他异常处理方式
- print("success")
- if __name__ == '__main__':
- keyword_summary_task = KeywordSummaryTask()
- keyword_summary_task.process_texts_concurrently()
|