keywords_utils.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import concurrent
  2. import json
  3. from concurrent.futures import ThreadPoolExecutor
  4. from time import sleep
  5. from venv import logger
  6. from core.database import DBHelper
  7. from data_models.content_chunks import ContentChunks
  8. from data_models.keyword_clustering import KeywordClustering
  9. from data_models.keyword_data import KeywordData
  10. from data_models.keyword_with_content_chunk import KeywordWithContentChunk
  11. from utils.deepseek_utils import get_keyword_summary, update_keyword_summary_prompt
  12. class KeywordSummaryTask:
  13. def __init__(self):
  14. self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
  15. def _generate_keywords(self, content_chunk):
  16. db_helper = DBHelper()
  17. keywords = json.loads(content_chunk.keywords)
  18. for keyword in keywords:
  19. keyword_data = db_helper.get(KeywordData, keyword=keyword)
  20. if keyword_data is None:
  21. try:
  22. new_keyword_data = KeywordData(keyword=keyword)
  23. keyword_data = db_helper.add(new_keyword_data)
  24. except Exception as e:
  25. return
  26. keyword_with_content_chunk = db_helper.get(KeywordWithContentChunk, keyword_id=keyword_data.id,
  27. content_chunk_id=content_chunk.id)
  28. if keyword_with_content_chunk is None:
  29. try:
  30. keyword_with_content_chunk = KeywordWithContentChunk(keyword_id=keyword_data.id,
  31. content_chunk_id=content_chunk.id)
  32. db_helper.add(keyword_with_content_chunk)
  33. except Exception as e:
  34. return
  35. if keyword_with_content_chunk.keyword_clustering_status == 0:
  36. try:
  37. keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
  38. if keyword_clustering is None:
  39. keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
  40. new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
  41. keyword_summary=keyword_summary['keyword_summary'])
  42. db_helper.add(new_keyword_clustering)
  43. else:
  44. new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
  45. content_chunk.text)
  46. db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
  47. updates={"keyword_summary": new_keyword_summary})
  48. db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
  49. updates={"keyword_clustering_status": 1})
  50. except Exception as e:
  51. return
  52. db_helper.update(ContentChunks, filters={"id": content_chunk.id},
  53. updates={"keywords_status": 1})
  54. # 使用线程池处理文本列表
  55. def process_texts_concurrently(self):
  56. print('process_texts_concurrently start')
  57. db_helper = DBHelper()
  58. while True:
  59. content_chunks = db_helper.get_all(ContentChunks, chunk_status=2, keywords_status=0)
  60. if len(content_chunks) == 0:
  61. logger.info('sleep')
  62. print('sleep')
  63. sleep(1800)
  64. else:
  65. future_to_chunk = {self.executor.submit(self._generate_keywords, content_chunk): content_chunk for
  66. content_chunk
  67. in
  68. content_chunks}
  69. # 等待所有任务完成
  70. concurrent.futures.wait(future_to_chunk.keys())
  71. # 创建一个字典,内容块到结果的映射(注意:这里假设任务没有异常,如果有异常,result()会抛出)
  72. results = {}
  73. for future, chunk in future_to_chunk.items():
  74. try:
  75. results[chunk] = future.result()
  76. except Exception as exc:
  77. results[chunk] = exc # 或者你可以选择其他异常处理方式
  78. print("success")
  79. if __name__ == '__main__':
  80. keyword_summary_task = KeywordSummaryTask()
  81. keyword_summary_task.process_texts_concurrently()