import concurrent import json import threading from concurrent.futures import ThreadPoolExecutor from time import sleep from venv import logger from core.database import DBHelper from data_models.content_chunks import ContentChunks from data_models.keyword_clustering import KeywordClustering from data_models.keyword_data import KeywordData from data_models.keyword_with_content_chunk import KeywordWithContentChunk from utils.deepseek_utils import get_keyword_summary, update_keyword_summary class KeywordSummaryTask: lock_dict = {} # 静态变量,不会随着每次实例化而重置 def __init__(self): self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask') def get_lock_for_keyword(self, keyword_id): if keyword_id not in self.lock_dict: self.lock_dict[keyword_id] = threading.Lock() return self.lock_dict[keyword_id] def _generate_keywords(self, content_chunk): db_helper = DBHelper() keywords = json.loads(content_chunk.keywords) for keyword in keywords: keyword_data = db_helper.get(KeywordData, keyword=keyword) if keyword_data is None: try: new_keyword_data = KeywordData(keyword=keyword) keyword_data = db_helper.add(new_keyword_data) except Exception as e: return keyword_with_content_chunk = db_helper.get(KeywordWithContentChunk, keyword_id=keyword_data.id, content_chunk_id=content_chunk.id) if keyword_with_content_chunk is None: try: keyword_with_content_chunk = KeywordWithContentChunk(keyword_id=keyword_data.id, content_chunk_id=content_chunk.id) db_helper.add(keyword_with_content_chunk) except Exception as e: return # 获取对应 keyword_id 的锁 lock = self.get_lock_for_keyword(keyword_data.id) with lock: if keyword_with_content_chunk.keyword_clustering_status == 0: try: keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id) if keyword_clustering is None: keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword) new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id, keyword_summary=keyword_summary[ 'keyword_summary']) db_helper.add(new_keyword_clustering) else: new_keyword_summary = update_keyword_summary(keyword_clustering.keyword_summary, keyword, content_chunk.text) db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id}, updates={"keyword_summary": new_keyword_summary}) db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id}, updates={"keyword_clustering_status": 1}) except Exception as e: db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id}, updates={"keyword_clustering_status": 2}) db_helper.update(ContentChunks, filters={"id": content_chunk.id}, updates={"keywords_status": 1}) # 使用线程池处理文本列表 def process_texts_concurrently(self): print('process_texts_concurrently start') db_helper = DBHelper() while True: content_chunks = db_helper.get_all(ContentChunks, limit=200, chunk_status=2, keywords_status=0) if len(content_chunks) == 0: logger.info('sleep') print('sleep') sleep(1800) else: future_to_chunk = {self.executor.submit(self._generate_keywords, content_chunk): content_chunk for content_chunk in content_chunks} # 等待所有任务完成 concurrent.futures.wait(future_to_chunk.keys()) # 创建一个字典,内容块到结果的映射(注意:这里假设任务没有异常,如果有异常,result()会抛出) results = {} for future, chunk in future_to_chunk.items(): try: results[chunk] = future.result() except Exception as exc: results[chunk] = exc # 或者你可以选择其他异常处理方式 print("success") if __name__ == '__main__': db_helper = DBHelper() print(db_helper.get(KeywordData, keyword='短视频'))