|
@@ -0,0 +1,132 @@
|
|
|
+import json
|
|
|
+import concurrent
|
|
|
+import datetime
|
|
|
+from tqdm import tqdm
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
+from applications.api import fetch_deepseek_response
|
|
|
+from applications.db import DatabaseConnector
|
|
|
+from config import long_articles_config
|
|
|
+
|
|
|
+INIT_STATUS = 0
|
|
|
+PROCESSING_STATUS = 1
|
|
|
+SUCCESS_STATUS = 2
|
|
|
+FAIL_STATUS = 99
|
|
|
+
|
|
|
+
|
|
|
+def generate_prompt(title):
|
|
|
+ prompt = f"""
|
|
|
+ **输入标题**:
|
|
|
+ {title}
|
|
|
+ task1:`请按以下维度解构标题核心元素:`
|
|
|
+ 1. **核心人物**(带身份标签):
|
|
|
+ 2. **冲突双方**:
|
|
|
+ 3. **核心事件**:
|
|
|
+ 4. **反常细节**:
|
|
|
+ 5. **时空坐标**:
|
|
|
+ 6. **认知钩子**:
|
|
|
+ 7. **隐性议题**:
|
|
|
+ task2: 请基于 task1 结构元素,套用以下公式来生成关键词
|
|
|
+ | 公式类型 | 生成逻辑
|
|
|
+ |--------------- |----------------------------
|
|
|
+ | **悬念挖掘式** | [人物]+[反常行为]+[学科解释需求]
|
|
|
+ | **时空穿越式** | [历史事件]在[年代]的[群体]解读
|
|
|
+ | **技术拆解式** | [人物]的[专业领域]+[技术术语]
|
|
|
+ | **文化对抗式** | [国家A]如何重新定义[国家B]的[历史符号]
|
|
|
+ | **暗线追踪式** | [微小物证]揭示的[大历史真相]
|
|
|
+
|
|
|
+ task3: 基于 task2 生成的关键词矩阵,生成一批长尾词,通过这些词可以在媒体平台搜索到相关内容,
|
|
|
+ 要求这些内容对 50 岁以上的中老年人有一定的吸引性
|
|
|
+
|
|
|
+ 输出: 只需要输出task3 生成的长尾词列表
|
|
|
+ 输出格式: LIST
|
|
|
+ 输出示例: ["长尾词1", "长尾词2", "长尾词3", "长尾词4", ...]
|
|
|
+ """
|
|
|
+ return prompt
|
|
|
+
|
|
|
+def lock_task(db_client, task_md5):
|
|
|
+ lock_query = f"""
|
|
|
+ update `article_pool_promotion_source`
|
|
|
+ set association_status = %s, association_update_timestamp = %s
|
|
|
+ where title_md5 = %s and association_status = %s;
|
|
|
+ """
|
|
|
+ affected_rows = db_client.save(lock_query, params=(PROCESSING_STATUS, datetime.datetime.now(), task_md5, INIT_STATUS))
|
|
|
+ return affected_rows
|
|
|
+
|
|
|
+def rollback_task(db_client):
|
|
|
+ return
|
|
|
+
|
|
|
+def generate_single_title(task):
|
|
|
+ title = task['title']
|
|
|
+ thread_client = DatabaseConnector(long_articles_config)
|
|
|
+ thread_client.connect()
|
|
|
+
|
|
|
+ lock_result = lock_task(thread_client, task['title_md5'])
|
|
|
+ if not lock_result:
|
|
|
+ return
|
|
|
+
|
|
|
+ prompt = generate_prompt(title)
|
|
|
+ response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
|
|
|
+ title_list_str = response.strip()
|
|
|
+ try:
|
|
|
+ title_list = json.loads(title_list_str)
|
|
|
+ except json.decoder.JSONDecodeError:
|
|
|
+ title_list = title_list_str
|
|
|
+ except Exception as e:
|
|
|
+ # set as fail
|
|
|
+ update_query = f"""
|
|
|
+ update `article_pool_promotion_source`
|
|
|
+ set association_status = %s, association_update_timestamp = %s
|
|
|
+ where title_md5 = %s and association_status = %s;
|
|
|
+ """
|
|
|
+ thread_client.save(
|
|
|
+ update_query,
|
|
|
+ params=(
|
|
|
+ FAIL_STATUS, datetime.datetime.now(), task['title_md5'],
|
|
|
+ PROCESSING_STATUS
|
|
|
+ )
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ # set as success
|
|
|
+ update_query = f"""
|
|
|
+ update `article_pool_promotion_source`
|
|
|
+ set association_status = %s, association_update_timestamp = %s, association_title = %s
|
|
|
+ where title_md5 = %s and association_status = %s;
|
|
|
+ """
|
|
|
+ thread_client.save(
|
|
|
+ update_query,
|
|
|
+ params=(
|
|
|
+ SUCCESS_STATUS, datetime.datetime.now(), json.dumps(title_list, ensure_ascii=False), task['title_md5'], PROCESSING_STATUS
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+def get_task_list():
|
|
|
+ db = DatabaseConnector(long_articles_config)
|
|
|
+ db.connect()
|
|
|
+ fetch_query = f"""
|
|
|
+ select distinct title, title_md5
|
|
|
+ from `article_pool_promotion_source`
|
|
|
+ where `level` = 'autoArticlePoolLevel1'
|
|
|
+ and status = 1 and `deleted` = 0 and association_status = 0
|
|
|
+ order by `create_timestamp` desc
|
|
|
+ limit 1000;
|
|
|
+ """
|
|
|
+ title_list = db.fetch(fetch_query, cursor_type=DictCursor)
|
|
|
+ return title_list
|
|
|
+
|
|
|
+def main():
|
|
|
+ task_list = get_task_list()
|
|
|
+ with ThreadPoolExecutor(max_workers=8) as executor:
|
|
|
+ futures = [
|
|
|
+ executor.submit(generate_single_title, task) for task in task_list
|
|
|
+ ]
|
|
|
+ for future in tqdm(
|
|
|
+ concurrent.futures.as_completed(futures),
|
|
|
+ total=len(task_list),
|
|
|
+ desc="处理进度",
|
|
|
+ ):
|
|
|
+ future.result()
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|