generate_search_keys.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import json
  2. import time
  3. import concurrent
  4. import datetime
  5. from tqdm import tqdm
  6. from pymysql.cursors import DictCursor
  7. from concurrent.futures import ThreadPoolExecutor
  8. from applications.api import fetch_deepseek_response
  9. from applications.db import DatabaseConnector
  10. from config import long_articles_config
  11. INIT_STATUS = 0
  12. PROCESSING_STATUS = 1
  13. SUCCESS_STATUS = 2
  14. FAIL_STATUS = 99
  15. MAX_PROCESSING_TIME = 60 * 60
  16. def generate_prompt(title):
  17. prompt = f"""
  18. **输入标题**:
  19. {title}
  20. task1:`请按以下维度解构标题核心元素:`
  21. 1. **核心人物**(带身份标签):
  22. 2. **冲突双方**:
  23. 3. **核心事件**:
  24. 4. **反常细节**:
  25. 5. **时空坐标**:
  26. 6. **认知钩子**:
  27. 7. **隐性议题**:
  28. task2: 请基于 task1 结构元素,套用以下公式来生成关键词
  29. | 公式类型 | 生成逻辑
  30. |--------------- |----------------------------
  31. | **悬念挖掘式** | [人物]+[反常行为]+[学科解释需求]
  32. | **时空穿越式** | [历史事件]在[年代]的[群体]解读
  33. | **技术拆解式** | [人物]的[专业领域]+[技术术语]
  34. | **文化对抗式** | [国家A]如何重新定义[国家B]的[历史符号]
  35. | **暗线追踪式** | [微小物证]揭示的[大历史真相]
  36. task3: 基于 task2 生成的关键词矩阵,生成一批长尾词,通过这些词可以在媒体平台搜索到相关内容,
  37. 要求这些内容对 50 岁以上的中老年人有一定的吸引性
  38. 输出: 只需要输出task3 生成的长尾词列表
  39. 输出示例: ["长尾词1", "长尾词2", "长尾词3", "长尾词4", ...]
  40. """
  41. return prompt
  42. def lock_task(db_client, task_md5):
  43. lock_query = f"""
  44. update `article_pool_promotion_source`
  45. set association_status = %s, association_update_timestamp = %s
  46. where title_md5 = %s and association_status = %s;
  47. """
  48. affected_rows = db_client.save(lock_query, params=(PROCESSING_STATUS, datetime.datetime.now(), task_md5, INIT_STATUS))
  49. return affected_rows
  50. def rollback_task(db_client):
  51. now_timestamp = int(time.time())
  52. timestamp_threshold = datetime.datetime.fromtimestamp(
  53. now_timestamp - MAX_PROCESSING_TIME
  54. )
  55. update_query = f"""
  56. update `article_pool_promotion_source`
  57. set association_status = %s
  58. where association_status = %s and association_update_timestamp < %s;
  59. """
  60. rollback_rows = db_client.save(
  61. query=update_query, params=(INIT_STATUS, PROCESSING_STATUS, timestamp_threshold)
  62. )
  63. return rollback_rows
  64. def generate_single_title(task):
  65. title = task['title']
  66. thread_client = DatabaseConnector(long_articles_config)
  67. thread_client.connect()
  68. rollback_task(thread_client)
  69. lock_result = lock_task(thread_client, task['title_md5'])
  70. if not lock_result:
  71. return
  72. prompt = generate_prompt(title)
  73. response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
  74. title_list_str = response.strip()
  75. try:
  76. title_list = json.loads(title_list_str)
  77. except json.decoder.JSONDecodeError:
  78. title_list = title_list_str
  79. except Exception as e:
  80. # set as fail
  81. update_query = f"""
  82. update `article_pool_promotion_source`
  83. set association_status = %s, association_update_timestamp = %s
  84. where title_md5 = %s and association_status = %s;
  85. """
  86. thread_client.save(
  87. update_query,
  88. params=(
  89. FAIL_STATUS, datetime.datetime.now(), task['title_md5'],
  90. PROCESSING_STATUS
  91. )
  92. )
  93. return
  94. # set as success
  95. update_query = f"""
  96. update `article_pool_promotion_source`
  97. set association_status = %s, association_update_timestamp = %s, association_title = %s
  98. where title_md5 = %s and association_status = %s;
  99. """
  100. thread_client.save(
  101. update_query,
  102. params=(
  103. SUCCESS_STATUS, datetime.datetime.now(), json.dumps(title_list, ensure_ascii=False), task['title_md5'], PROCESSING_STATUS
  104. )
  105. )
  106. def get_task_list():
  107. db = DatabaseConnector(long_articles_config)
  108. db.connect()
  109. fetch_query = f"""
  110. select distinct title, title_md5
  111. from `article_pool_promotion_source`
  112. where `level` = 'autoArticlePoolLevel1'
  113. and status = 1 and `deleted` = 0 and association_status = 0
  114. order by `create_timestamp` desc
  115. limit 160;
  116. """
  117. title_list = db.fetch(fetch_query, cursor_type=DictCursor)
  118. return title_list
  119. def get_association_title_list_in_multi_threads():
  120. task_list = get_task_list()
  121. with ThreadPoolExecutor(max_workers=8) as executor:
  122. futures = [
  123. executor.submit(generate_single_title, task) for task in task_list
  124. ]
  125. for future in tqdm(
  126. concurrent.futures.as_completed(futures),
  127. total=len(task_list),
  128. desc="处理进度",
  129. ):
  130. future.result()