generate_search_keys.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import json
  2. import concurrent
  3. import datetime
  4. from tqdm import tqdm
  5. from pymysql.cursors import DictCursor
  6. from concurrent.futures import ThreadPoolExecutor
  7. from applications.api import fetch_deepseek_response
  8. from applications.db import DatabaseConnector
  9. from config import long_articles_config
  10. INIT_STATUS = 0
  11. PROCESSING_STATUS = 1
  12. SUCCESS_STATUS = 2
  13. FAIL_STATUS = 99
  14. def generate_prompt(title):
  15. prompt = f"""
  16. **输入标题**:
  17. {title}
  18. task1:`请按以下维度解构标题核心元素:`
  19. 1. **核心人物**(带身份标签):
  20. 2. **冲突双方**:
  21. 3. **核心事件**:
  22. 4. **反常细节**:
  23. 5. **时空坐标**:
  24. 6. **认知钩子**:
  25. 7. **隐性议题**:
  26. task2: 请基于 task1 结构元素,套用以下公式来生成关键词
  27. | 公式类型 | 生成逻辑
  28. |--------------- |----------------------------
  29. | **悬念挖掘式** | [人物]+[反常行为]+[学科解释需求]
  30. | **时空穿越式** | [历史事件]在[年代]的[群体]解读
  31. | **技术拆解式** | [人物]的[专业领域]+[技术术语]
  32. | **文化对抗式** | [国家A]如何重新定义[国家B]的[历史符号]
  33. | **暗线追踪式** | [微小物证]揭示的[大历史真相]
  34. task3: 基于 task2 生成的关键词矩阵,生成一批长尾词,通过这些词可以在媒体平台搜索到相关内容,
  35. 要求这些内容对 50 岁以上的中老年人有一定的吸引性
  36. 输出: 只需要输出task3 生成的长尾词列表
  37. 输出格式: LIST
  38. 输出示例: ["长尾词1", "长尾词2", "长尾词3", "长尾词4", ...]
  39. """
  40. return prompt
  41. def lock_task(db_client, task_md5):
  42. lock_query = f"""
  43. update `article_pool_promotion_source`
  44. set association_status = %s, association_update_timestamp = %s
  45. where title_md5 = %s and association_status = %s;
  46. """
  47. affected_rows = db_client.save(lock_query, params=(PROCESSING_STATUS, datetime.datetime.now(), task_md5, INIT_STATUS))
  48. return affected_rows
  49. def rollback_task(db_client):
  50. return
  51. def generate_single_title(task):
  52. title = task['title']
  53. thread_client = DatabaseConnector(long_articles_config)
  54. thread_client.connect()
  55. lock_result = lock_task(thread_client, task['title_md5'])
  56. if not lock_result:
  57. return
  58. prompt = generate_prompt(title)
  59. response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
  60. title_list_str = response.strip()
  61. try:
  62. title_list = json.loads(title_list_str)
  63. except json.decoder.JSONDecodeError:
  64. title_list = title_list_str
  65. except Exception as e:
  66. # set as fail
  67. update_query = f"""
  68. update `article_pool_promotion_source`
  69. set association_status = %s, association_update_timestamp = %s
  70. where title_md5 = %s and association_status = %s;
  71. """
  72. thread_client.save(
  73. update_query,
  74. params=(
  75. FAIL_STATUS, datetime.datetime.now(), task['title_md5'],
  76. PROCESSING_STATUS
  77. )
  78. )
  79. return
  80. # set as success
  81. update_query = f"""
  82. update `article_pool_promotion_source`
  83. set association_status = %s, association_update_timestamp = %s, association_title = %s
  84. where title_md5 = %s and association_status = %s;
  85. """
  86. thread_client.save(
  87. update_query,
  88. params=(
  89. SUCCESS_STATUS, datetime.datetime.now(), json.dumps(title_list, ensure_ascii=False), task['title_md5'], PROCESSING_STATUS
  90. )
  91. )
  92. def get_task_list():
  93. db = DatabaseConnector(long_articles_config)
  94. db.connect()
  95. fetch_query = f"""
  96. select distinct title, title_md5
  97. from `article_pool_promotion_source`
  98. where `level` = 'autoArticlePoolLevel1'
  99. and status = 1 and `deleted` = 0 and association_status = 0
  100. order by `create_timestamp` desc
  101. limit 1000;
  102. """
  103. title_list = db.fetch(fetch_query, cursor_type=DictCursor)
  104. return title_list
  105. def main():
  106. task_list = get_task_list()
  107. with ThreadPoolExecutor(max_workers=8) as executor:
  108. futures = [
  109. executor.submit(generate_single_title, task) for task in task_list
  110. ]
  111. for future in tqdm(
  112. concurrent.futures.as_completed(futures),
  113. total=len(task_list),
  114. desc="处理进度",
  115. ):
  116. future.result()
  117. if __name__ == '__main__':
  118. main()