|
|
@@ -1,5 +1,6 @@
|
|
|
import concurrent
|
|
|
import json
|
|
|
+import threading
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
from time import sleep
|
|
|
from venv import logger
|
|
|
@@ -15,6 +16,12 @@ from utils.deepseek_utils import get_keyword_summary, update_keyword_summary_pro
|
|
|
class KeywordSummaryTask:
|
|
|
def __init__(self):
|
|
|
self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
|
|
|
+ self.lock_dict = {}
|
|
|
+
|
|
|
+ def get_lock_for_keyword(self, keyword_id):
|
|
|
+ if keyword_id not in self.lock_dict:
|
|
|
+ self.lock_dict[keyword_id] = threading.Lock()
|
|
|
+ return self.lock_dict[keyword_id]
|
|
|
|
|
|
def _generate_keywords(self, content_chunk):
|
|
|
db_helper = DBHelper()
|
|
|
@@ -36,23 +43,30 @@ class KeywordSummaryTask:
|
|
|
db_helper.add(keyword_with_content_chunk)
|
|
|
except Exception as e:
|
|
|
return
|
|
|
- if keyword_with_content_chunk.keyword_clustering_status == 0:
|
|
|
- try:
|
|
|
- keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
|
|
|
- if keyword_clustering is None:
|
|
|
- keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
|
|
|
- new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
|
|
|
- keyword_summary=keyword_summary['keyword_summary'])
|
|
|
- db_helper.add(new_keyword_clustering)
|
|
|
- else:
|
|
|
- new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
|
|
|
- content_chunk.text)
|
|
|
- db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
|
|
|
- updates={"keyword_summary": new_keyword_summary})
|
|
|
- db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
|
|
|
- updates={"keyword_clustering_status": 1})
|
|
|
- except Exception as e:
|
|
|
- return
|
|
|
+ # 获取对应 keyword_id 的锁
|
|
|
+ lock = self.get_lock_for_keyword(keyword_data.id)
|
|
|
+ with lock:
|
|
|
+ if keyword_with_content_chunk.keyword_clustering_status == 0:
|
|
|
+ try:
|
|
|
+ keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
|
|
|
+ if keyword_clustering is None:
|
|
|
+ keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
|
|
|
+ new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
|
|
|
+ keyword_summary=keyword_summary[
|
|
|
+ 'keyword_summary'])
|
|
|
+ db_helper.add(new_keyword_clustering)
|
|
|
+ else:
|
|
|
+ new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary,
|
|
|
+ keyword,
|
|
|
+ content_chunk.text)
|
|
|
+ db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
|
|
|
+ updates={"keyword_summary": new_keyword_summary})
|
|
|
+ db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
|
|
|
+ updates={"keyword_clustering_status": 1})
|
|
|
+ except Exception as e:
|
|
|
+ db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
|
|
|
+ updates={"keyword_clustering_status": 2})
|
|
|
+
|
|
|
db_helper.update(ContentChunks, filters={"id": content_chunk.id},
|
|
|
updates={"keywords_status": 1})
|
|
|
|