import json import concurrent import datetime from tqdm import tqdm from pymysql.cursors import DictCursor from concurrent.futures import ThreadPoolExecutor from applications.api import fetch_deepseek_response from applications.db import DatabaseConnector from config import long_articles_config INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAIL_STATUS = 99 def generate_prompt(title): prompt = f""" **输入标题**: {title} task1:`请按以下维度解构标题核心元素:` 1. **核心人物**(带身份标签): 2. **冲突双方**: 3. **核心事件**: 4. **反常细节**: 5. **时空坐标**: 6. **认知钩子**: 7. **隐性议题**: task2: 请基于 task1 结构元素,套用以下公式来生成关键词 | 公式类型 | 生成逻辑 |--------------- |---------------------------- | **悬念挖掘式** | [人物]+[反常行为]+[学科解释需求] | **时空穿越式** | [历史事件]在[年代]的[群体]解读 | **技术拆解式** | [人物]的[专业领域]+[技术术语] | **文化对抗式** | [国家A]如何重新定义[国家B]的[历史符号] | **暗线追踪式** | [微小物证]揭示的[大历史真相] task3: 基于 task2 生成的关键词矩阵,生成一批长尾词,通过这些词可以在媒体平台搜索到相关内容, 要求这些内容对 50 岁以上的中老年人有一定的吸引性 输出: 只需要输出task3 生成的长尾词列表 输出格式: LIST 输出示例: ["长尾词1", "长尾词2", "长尾词3", "长尾词4", ...] """ return prompt def lock_task(db_client, task_md5): lock_query = f""" update `article_pool_promotion_source` set association_status = %s, association_update_timestamp = %s where title_md5 = %s and association_status = %s; """ affected_rows = db_client.save(lock_query, params=(PROCESSING_STATUS, datetime.datetime.now(), task_md5, INIT_STATUS)) return affected_rows def rollback_task(db_client): return def generate_single_title(task): title = task['title'] thread_client = DatabaseConnector(long_articles_config) thread_client.connect() lock_result = lock_task(thread_client, task['title_md5']) if not lock_result: return prompt = generate_prompt(title) response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt) title_list_str = response.strip() try: title_list = json.loads(title_list_str) except json.decoder.JSONDecodeError: title_list = title_list_str except Exception as e: # set as fail update_query = f""" update `article_pool_promotion_source` set association_status = %s, association_update_timestamp = %s where title_md5 = %s and association_status = %s; """ thread_client.save( update_query, params=( FAIL_STATUS, datetime.datetime.now(), task['title_md5'], PROCESSING_STATUS ) ) return # set as success update_query = f""" update `article_pool_promotion_source` set association_status = %s, association_update_timestamp = %s, association_title = %s where title_md5 = %s and association_status = %s; """ thread_client.save( update_query, params=( SUCCESS_STATUS, datetime.datetime.now(), json.dumps(title_list, ensure_ascii=False), task['title_md5'], PROCESSING_STATUS ) ) def get_task_list(): db = DatabaseConnector(long_articles_config) db.connect() fetch_query = f""" select distinct title, title_md5 from `article_pool_promotion_source` where `level` = 'autoArticlePoolLevel1' and status = 1 and `deleted` = 0 and association_status = 0 order by `create_timestamp` desc limit 1000; """ title_list = db.fetch(fetch_query, cursor_type=DictCursor) return title_list def main(): task_list = get_task_list() with ThreadPoolExecutor(max_workers=8) as executor: futures = [ executor.submit(generate_single_title, task) for task in task_list ] for future in tqdm( concurrent.futures.as_completed(futures), total=len(task_list), desc="处理进度", ): future.result() if __name__ == '__main__': main()