123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import json
- import time
- import concurrent
- import datetime
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from concurrent.futures import ThreadPoolExecutor
- from applications.api import fetch_deepseek_response
- from applications.db import DatabaseConnector
- from config import long_articles_config
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAIL_STATUS = 99
- MAX_PROCESSING_TIME = 60 * 60
- def generate_prompt(title):
- prompt = f"""
- **输入标题**:
- {title}
- task1:`请按以下维度解构标题核心元素:`
- 1. **核心人物**(带身份标签):
- 2. **冲突双方**:
- 3. **核心事件**:
- 4. **反常细节**:
- 5. **时空坐标**:
- 6. **认知钩子**:
- 7. **隐性议题**:
- task2: 请基于 task1 结构元素,套用以下公式来生成关键词
- | 公式类型 | 生成逻辑
- |--------------- |----------------------------
- | **悬念挖掘式** | [人物]+[反常行为]+[学科解释需求]
- | **时空穿越式** | [历史事件]在[年代]的[群体]解读
- | **技术拆解式** | [人物]的[专业领域]+[技术术语]
- | **文化对抗式** | [国家A]如何重新定义[国家B]的[历史符号]
- | **暗线追踪式** | [微小物证]揭示的[大历史真相]
- task3: 基于 task2 生成的关键词矩阵,生成一批长尾词,通过这些词可以在媒体平台搜索到相关内容,
- 要求这些内容对 50 岁以上的中老年人有一定的吸引性
- 输出: 只需要输出task3 生成的长尾词列表
- 输出示例: ["长尾词1", "长尾词2", "长尾词3", "长尾词4", ...]
- """
- return prompt
- def lock_task(db_client, task_md5):
- lock_query = f"""
- update `article_pool_promotion_source`
- set association_status = %s, association_update_timestamp = %s
- where title_md5 = %s and association_status = %s;
- """
- affected_rows = db_client.save(lock_query, params=(PROCESSING_STATUS, datetime.datetime.now(), task_md5, INIT_STATUS))
- return affected_rows
- def rollback_task(db_client):
- now_timestamp = int(time.time())
- timestamp_threshold = datetime.datetime.fromtimestamp(
- now_timestamp - MAX_PROCESSING_TIME
- )
- update_query = f"""
- update `article_pool_promotion_source`
- set association_status = %s
- where association_status = %s and association_update_timestamp < %s;
- """
- rollback_rows = db_client.save(
- query=update_query, params=(INIT_STATUS, PROCESSING_STATUS, timestamp_threshold)
- )
- return rollback_rows
- def generate_single_title(task):
- title = task['title']
- thread_client = DatabaseConnector(long_articles_config)
- thread_client.connect()
- rollback_task(thread_client)
- lock_result = lock_task(thread_client, task['title_md5'])
- if not lock_result:
- return
- prompt = generate_prompt(title)
- response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
- title_list_str = response.strip()
- try:
- title_list = json.loads(title_list_str)
- except json.decoder.JSONDecodeError:
- title_list = title_list_str
- except Exception as e:
- # set as fail
- update_query = f"""
- update `article_pool_promotion_source`
- set association_status = %s, association_update_timestamp = %s
- where title_md5 = %s and association_status = %s;
- """
- thread_client.save(
- update_query,
- params=(
- FAIL_STATUS, datetime.datetime.now(), task['title_md5'],
- PROCESSING_STATUS
- )
- )
- return
- # set as success
- update_query = f"""
- update `article_pool_promotion_source`
- set association_status = %s, association_update_timestamp = %s, association_title = %s
- where title_md5 = %s and association_status = %s;
- """
- thread_client.save(
- update_query,
- params=(
- SUCCESS_STATUS, datetime.datetime.now(), json.dumps(title_list, ensure_ascii=False), task['title_md5'], PROCESSING_STATUS
- )
- )
- def get_task_list():
- db = DatabaseConnector(long_articles_config)
- db.connect()
- fetch_query = f"""
- select distinct title, title_md5
- from `article_pool_promotion_source`
- where `level` = 'autoArticlePoolLevel1'
- and status = 1 and `deleted` = 0 and association_status = 0
- order by `create_timestamp` desc
- limit 160;
- """
- title_list = db.fetch(fetch_query, cursor_type=DictCursor)
- return title_list
- def get_association_title_list_in_multi_threads():
- task_list = get_task_list()
- with ThreadPoolExecutor(max_workers=8) as executor:
- futures = [
- executor.submit(generate_single_title, task) for task in task_list
- ]
- for future in tqdm(
- concurrent.futures.as_completed(futures),
- total=len(task_list),
- desc="处理进度",
- ):
- future.result()
|