keywords_utils.py 4.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import concurrent
  2. import json
  3. from concurrent.futures import ThreadPoolExecutor
  4. from time import sleep
  5. from venv import logger
  6. from core.database import DBHelper
  7. from data_models.content_chunks import ContentChunks
  8. from data_models.keyword_clustering import KeywordClustering
  9. from data_models.keyword_data import KeywordData
  10. from data_models.keyword_with_content_chunk import KeywordWithContentChunk
  11. from utils.deepseek_utils import get_keyword_summary, update_keyword_summary_prompt
  12. class KeywordSummaryTask:
  13. def __init__(self):
  14. self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
  15. def _generate_keywords(self, content_chunk):
  16. db_helper = DBHelper()
  17. keywords = json.loads(content_chunk.keywords)
  18. for keyword in keywords:
  19. keyword_data = db_helper.get(KeywordData, keyword=keyword)
  20. if keyword_data is None:
  21. try:
  22. new_keyword_data = KeywordData(keyword=keyword)
  23. keyword_data = db_helper.add(new_keyword_data)
  24. except Exception as e:
  25. return
  26. keyword_with_content_chunk = db_helper.get(KeywordWithContentChunk, keyword_id=keyword_data.id,
  27. content_chunk_id=content_chunk.id)
  28. if keyword_with_content_chunk is None:
  29. try:
  30. keyword_with_content_chunk = KeywordWithContentChunk(keyword_id=keyword_data.id,
  31. content_chunk_id=content_chunk.id)
  32. db_helper.add(keyword_with_content_chunk)
  33. except Exception as e:
  34. return
  35. if keyword_with_content_chunk.keyword_clustering_status == 0:
  36. try:
  37. keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
  38. if keyword_clustering is None:
  39. keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
  40. new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
  41. keyword_summary=keyword_summary['keyword_summary'])
  42. db_helper.add(new_keyword_clustering)
  43. else:
  44. new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
  45. content_chunk.text)
  46. db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
  47. updates={"keyword_summary": new_keyword_summary})
  48. db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
  49. updates={"keyword_clustering_status": 1})
  50. except Exception as e:
  51. print(e)
  52. db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
  53. updates={"keyword_clustering_status": 2})
  54. db_helper.update(ContentChunks, filters={"id": content_chunk.id},
  55. updates={"keywords_status": 1})
  56. # 使用线程池处理文本列表
  57. def process_texts_concurrently(self):
  58. print('process_texts_concurrently start')
  59. db_helper = DBHelper()
  60. while True:
  61. content_chunks = db_helper.get_all(ContentChunks, chunk_status=2, keywords_status=0)
  62. if len(content_chunks) == 0:
  63. logger.info('sleep')
  64. print('sleep')
  65. sleep(1800)
  66. else:
  67. future_to_chunk = {self.executor.submit(self._generate_keywords, content_chunk): content_chunk for
  68. content_chunk
  69. in
  70. content_chunks}
  71. # 等待所有任务完成
  72. concurrent.futures.wait(future_to_chunk.keys())
  73. # 创建一个字典,内容块到结果的映射(注意:这里假设任务没有异常,如果有异常,result()会抛出)
  74. results = {}
  75. for future, chunk in future_to_chunk.items():
  76. try:
  77. results[chunk] = future.result()
  78. except Exception as exc:
  79. results[chunk] = exc # 或者你可以选择其他异常处理方式
  80. print("success")
  81. if __name__ == '__main__':
  82. keyword_summary_task = KeywordSummaryTask()
  83. keyword_summary_task.process_texts_concurrently()