keywords_utils.py 4.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import concurrent
  2. import json
  3. from concurrent.futures import ThreadPoolExecutor
  4. from time import sleep
  5. from venv import logger
  6. from core.database import DBHelper
  7. from data_models.content_chunks import ContentChunks
  8. from data_models.keyword_clustering import KeywordClustering
  9. from data_models.keyword_data import KeywordData
  10. from data_models.keyword_with_content_chunk import KeywordWithContentChunk
  11. from utils.deepseek_utils import get_keyword_summary, update_keyword_summary_prompt
  12. class KeywordSummaryTask:
  13. def __init__(self):
  14. self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
  15. def _generate_keywords(self, content_chunk):
  16. db_helper = DBHelper()
  17. keywords = json.loads(content_chunk.keywords)
  18. for keyword in keywords:
  19. keyword_data = db_helper.get(KeywordData, keyword=keyword)
  20. if keyword_data is None:
  21. new_keyword_data = KeywordData(keyword=keyword)
  22. keyword_data = db_helper.add(new_keyword_data)
  23. keyword_with_content_chunk = db_helper.get(KeywordWithContentChunk, keyword_id=keyword_data.id,
  24. content_chunk_id=content_chunk.id)
  25. if keyword_with_content_chunk is None:
  26. keyword_with_content_chunk = KeywordWithContentChunk(keyword_id=keyword_data.id,
  27. content_chunk_id=content_chunk.id)
  28. db_helper.add(keyword_with_content_chunk)
  29. if keyword_with_content_chunk.keyword_clustering_status == 0:
  30. try:
  31. keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
  32. if keyword_clustering is None:
  33. keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
  34. new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
  35. keyword_summary=keyword_summary['keyword_summary'])
  36. db_helper.add(new_keyword_clustering)
  37. else:
  38. new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
  39. content_chunk.text)
  40. db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
  41. updates={"keyword_summary": new_keyword_summary})
  42. db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
  43. updates={"keyword_clustering_status": 1})
  44. except Exception as e:
  45. print(e)
  46. db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
  47. updates={"keyword_clustering_status": 2})
  48. db_helper.update(ContentChunks, filters={"id": content_chunk.id},
  49. updates={"keywords_status": 1})
  50. # 使用线程池处理文本列表
  51. def process_texts_concurrently(self):
  52. print('process_texts_concurrently start')
  53. db_helper = DBHelper()
  54. while True:
  55. content_chunks = db_helper.get_all(ContentChunks, chunk_status=2, keywords_status=0)
  56. if len(content_chunks) == 0:
  57. logger.info('sleep')
  58. print('sleep')
  59. sleep(1800)
  60. else:
  61. future_to_chunk = {self.executor.submit(self._generate_keywords, content_chunk): content_chunk for
  62. content_chunk
  63. in
  64. content_chunks}
  65. # 等待所有任务完成
  66. concurrent.futures.wait(future_to_chunk.keys())
  67. # 创建一个字典,内容块到结果的映射(注意:这里假设任务没有异常,如果有异常,result()会抛出)
  68. results = {}
  69. for future, chunk in future_to_chunk.items():
  70. try:
  71. results[chunk] = future.result()
  72. except Exception as exc:
  73. results[chunk] = exc # 或者你可以选择其他异常处理方式
  74. print("success")
  75. if __name__ == '__main__':
  76. keyword_summary_task = KeywordSummaryTask()
  77. keyword_summary_task.process_texts_concurrently()