Forráskód Böngészése

Merge branch 'dev-xym-update' of algorithm/RAG into master

xueyiming 2 hónapja
szülő
commit
1f4451aa4c
1 módosított fájl, 31 hozzáadás és 17 törlés
  1. 31 17
      utils/keywords_utils.py

+ 31 - 17
utils/keywords_utils.py

@@ -1,5 +1,6 @@
 import concurrent
 import json
+import threading
 from concurrent.futures import ThreadPoolExecutor
 from time import sleep
 from venv import logger
@@ -13,9 +14,15 @@ from utils.deepseek_utils import get_keyword_summary, update_keyword_summary_pro
 
 
 class KeywordSummaryTask:
+    lock_dict = {}  # 静态变量,不会随着每次实例化而重置
     def __init__(self):
         self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
 
+    def get_lock_for_keyword(self, keyword_id):
+        if keyword_id not in self.lock_dict:
+            self.lock_dict[keyword_id] = threading.Lock()
+        return self.lock_dict[keyword_id]
+
     def _generate_keywords(self, content_chunk):
         db_helper = DBHelper()
         keywords = json.loads(content_chunk.keywords)
@@ -36,23 +43,30 @@ class KeywordSummaryTask:
                     db_helper.add(keyword_with_content_chunk)
                 except Exception as e:
                     return
-            if keyword_with_content_chunk.keyword_clustering_status == 0:
-                try:
-                    keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
-                    if keyword_clustering is None:
-                        keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
-                        new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
-                                                                   keyword_summary=keyword_summary['keyword_summary'])
-                        db_helper.add(new_keyword_clustering)
-                    else:
-                        new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
-                                                                            content_chunk.text)
-                        db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
-                                         updates={"keyword_summary": new_keyword_summary})
-                    db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
-                                     updates={"keyword_clustering_status": 1})
-                except Exception as e:
-                    return
+            # 获取对应 keyword_id 的锁
+            lock = self.get_lock_for_keyword(keyword_data.id)
+            with lock:
+                if keyword_with_content_chunk.keyword_clustering_status == 0:
+                    try:
+                        keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
+                        if keyword_clustering is None:
+                            keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
+                            new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
+                                                                       keyword_summary=keyword_summary[
+                                                                           'keyword_summary'])
+                            db_helper.add(new_keyword_clustering)
+                        else:
+                            new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary,
+                                                                                keyword,
+                                                                                content_chunk.text)
+                            db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
+                                             updates={"keyword_summary": new_keyword_summary})
+                        db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
+                                         updates={"keyword_clustering_status": 1})
+                    except Exception as e:
+                        db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
+                                         updates={"keyword_clustering_status": 2})
+
         db_helper.update(ContentChunks, filters={"id": content_chunk.id},
                          updates={"keywords_status": 1})