keywords_utils.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import concurrent
  2. import json
  3. import threading
  4. from concurrent.futures import ThreadPoolExecutor
  5. from time import sleep
  6. from venv import logger
  7. from core.database import DBHelper
  8. from data_models.content_chunks import ContentChunks
  9. from data_models.keyword_clustering import KeywordClustering
  10. from data_models.keyword_data import KeywordData
  11. from data_models.keyword_with_content_chunk import KeywordWithContentChunk
  12. from utils.deepseek_utils import get_keyword_summary, update_keyword_summary
  13. class KeywordSummaryTask:
  14. lock_dict = {} # 静态变量,不会随着每次实例化而重置
  15. def __init__(self):
  16. self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
  17. def get_lock_for_keyword(self, keyword_id):
  18. if keyword_id not in self.lock_dict:
  19. self.lock_dict[keyword_id] = threading.Lock()
  20. return self.lock_dict[keyword_id]
  21. def _generate_keywords(self, content_chunk):
  22. db_helper = DBHelper()
  23. keywords = json.loads(content_chunk.keywords)
  24. for keyword in keywords:
  25. keyword_data = db_helper.get(KeywordData, keyword=keyword)
  26. if keyword_data is None:
  27. try:
  28. new_keyword_data = KeywordData(keyword=keyword)
  29. keyword_data = db_helper.add(new_keyword_data)
  30. except Exception as e:
  31. return
  32. keyword_with_content_chunk = db_helper.get(KeywordWithContentChunk, keyword_id=keyword_data.id,
  33. content_chunk_id=content_chunk.id)
  34. if keyword_with_content_chunk is None:
  35. try:
  36. keyword_with_content_chunk = KeywordWithContentChunk(keyword_id=keyword_data.id,
  37. content_chunk_id=content_chunk.id)
  38. db_helper.add(keyword_with_content_chunk)
  39. except Exception as e:
  40. return
  41. # 获取对应 keyword_id 的锁
  42. lock = self.get_lock_for_keyword(keyword_data.id)
  43. with lock:
  44. if keyword_with_content_chunk.keyword_clustering_status == 0:
  45. try:
  46. keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
  47. if keyword_clustering is None:
  48. keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
  49. new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
  50. keyword_summary=keyword_summary[
  51. 'keyword_summary'])
  52. db_helper.add(new_keyword_clustering)
  53. else:
  54. new_keyword_summary = update_keyword_summary(keyword_clustering.keyword_summary,
  55. keyword,
  56. content_chunk.text)
  57. db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
  58. updates={"keyword_summary": new_keyword_summary})
  59. db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
  60. updates={"keyword_clustering_status": 1})
  61. except Exception as e:
  62. db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
  63. updates={"keyword_clustering_status": 2})
  64. db_helper.update(ContentChunks, filters={"id": content_chunk.id},
  65. updates={"keywords_status": 1})
  66. # 使用线程池处理文本列表
  67. def process_texts_concurrently(self):
  68. print('process_texts_concurrently start')
  69. db_helper = DBHelper()
  70. while True:
  71. content_chunks = db_helper.get_all(ContentChunks, limit=200, chunk_status=2, keywords_status=0)
  72. if len(content_chunks) == 0:
  73. logger.info('sleep')
  74. print('sleep')
  75. sleep(1800)
  76. else:
  77. future_to_chunk = {self.executor.submit(self._generate_keywords, content_chunk): content_chunk for
  78. content_chunk
  79. in
  80. content_chunks}
  81. # 等待所有任务完成
  82. concurrent.futures.wait(future_to_chunk.keys())
  83. # 创建一个字典,内容块到结果的映射(注意:这里假设任务没有异常,如果有异常,result()会抛出)
  84. results = {}
  85. for future, chunk in future_to_chunk.items():
  86. try:
  87. results[chunk] = future.result()
  88. except Exception as exc:
  89. results[chunk] = exc # 或者你可以选择其他异常处理方式
  90. print("success")
  91. if __name__ == '__main__':
  92. db_helper = DBHelper()
  93. print(db_helper.get(KeywordData, keyword='短视频'))