from __future__ import annotations import asyncio import json import traceback from typing import Dict, List, Tuple from tqdm.asyncio import tqdm from applications.api import fetch_deepseek_completion from applications.crawler.tophub import get_hot_point_content class CrawlerHotPointConst: MAX_PAGE_INDEX = 40 INIT_STATUS = 0 PROCESSING_STATUS = 1 USEFUL_STATUS = 2 NOT_USEFUL_STATUS = 3 FAILED_STATUS = 99 NOT_EXPIRED_STATUS = 1 EXPIRED_STATUS = 2 # batch PROCESS_TITLE_BATCH_SIZE = 500 # ignore platforms IGNORE_PLATFORMS = { "中国日报", "每日珠宝杂志", "iBag包包", "ZAKER", "NASA 🌍", "wikiHow 中文", "China Daily", "微信 ‧ 游戏", "Yahoo News", "北京天文馆", "本地宝" } class CrawlerHotPointBase(CrawlerHotPointConst): CLASSIFY_PROMPT = """ 你是一个内容分析助手,专门从热榜标题中识别出55岁以上老年人可能喜欢或关注的银发内容。 银发内容通常涉及健康、养老、退休生活、老年疾病、社会保障、代际关系、奇闻趣事、名人故事、社会事件等主题。 不要出现政治,当代国家领导人等敏感事件。 1. **任务描述**: 扫描所有标题,筛选出与银发内容高度相关时效性新闻信息。相关性判断基于标题是否直接或间接提及老年人相关话题,或可能吸引55岁以上人群的兴趣。返回适合的 id。 如果遇到敏感人物,正常过滤。请注意,一定要是新闻性事件, 请严格判断标题是否适合老年群体。 4. **输出格式**:输出结果为 JSON,只需要返回适合老年人话题的 id, 结构为 { "IDS": [1, 2, 3, ...] } 现在, 请处理我输入的标题 && id, please think step by step. """ @staticmethod def format_input_articles(fetch_response: List[Dict]) -> str: """ 格式化输入文章为字符串,每个文章占一行,格式为:id, title """ output_string = "" for item in fetch_response: output_string += f"{item['id']}, {item['title']}\n" return output_string class CrawlerHotPointMapper(CrawlerHotPointBase): def __init__(self, pool, log_client, trace_id): self.pool = pool self.log_client = log_client self.trace_id = trace_id async def save_articles(self, articles: List[Tuple]) -> int: """插入标题 && Link""" query = """ INSERT IGNORE INTO hot_point_titles (title, platform, link) VALUES (%s, %s, %s); """ return await self.pool.async_save(query=query, params=articles, batch=True) async def update_useful_status( self, article_id: int, origin_status: int, new_status: int ) -> int: """ 更新文章状态 """ query = """ UPDATE hot_point_titles SET useful = %s WHERE id = %s AND useful = %s; """ return await self.pool.async_save( query=query, params=(new_status, article_id, origin_status) ) async def set_as_processing(self, title_ids: List[int]) -> int: query = """ UPDATE hot_point_titles SET useful = %s WHERE id IN %s;""" return await self.pool.async_save( query=query, params=(self.PROCESSING_STATUS, tuple(title_ids)) ) async def set_as_failed(self, title_ids: List[int]) -> int: """ 设置文章为失败 """ query = """ UPDATE hot_point_titles SET useful = %s WHERE id IN %s; """ return await self.pool.async_save( query=query, params=(self.FAILED_STATUS, tuple(title_ids)) ) async def set_as_expired(self, article_id: int) -> int: """ 设置文章为过期 """ query = """ UPDATE hot_point_titles SET status = %s WHERE id = %s; """ return await self.pool.async_save( query=query, params=(self.EXPIRED_STATUS, article_id) ) async def fetch_init_articles(self) -> List[Dict]: """ 获取未经过 LLM 判处处理的事件 """ query = """ SELECT id, title FROM hot_point_titles WHERE status = %s AND useful = %s ORDER BY id Limit %s; """ return await self.pool.async_fetch(query=query, params=(self.NOT_EXPIRED_STATUS, self.INIT_STATUS, self.PROCESS_TITLE_BATCH_SIZE)) class CrawlerHotPointTask(CrawlerHotPointMapper): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) def process_raw_data(self, response_data): """ 处理原始数据 """ articles = [] for item in response_data['data']['data']: platform = item["source"] if platform in self.IGNORE_PLATFORMS: continue for article in item["rankList"]: title = article["title"] link = article["link"] articles.append((title, platform, link)) return articles async def crawl_hot_titles(self): """ 爬取热点标题 """ for page in tqdm(range(1, self.MAX_PAGE_INDEX)): try: raw_data = await get_hot_point_content(page_index=page) articles = self.process_raw_data(raw_data) await self.save_articles(articles) except Exception as e: print(f"crawl_hot_titles error: {e}") async def classify_articles_by_llm(self): """ 用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好 """ infos = await self.fetch_init_articles() # acquire lock title_ids = [item["id"] for item in infos] await self.set_as_processing(title_ids) prompt = f"{self.CLASSIFY_PROMPT}\n{self.format_input_articles(infos)}" response = fetch_deepseek_completion( prompt=prompt, model="DeepSeek-R1", output_type="json" ) if not response: w = await self.set_as_failed([item["id"] for item in infos]) print(w) return ids = set(response.get("IDS", [])) for item in tqdm(infos): id_ = item["id"] if id_ in ids: await self.update_useful_status(id_, self.PROCESSING_STATUS, self.USEFUL_STATUS) else: await self.update_useful_status(id_, self.PROCESSING_STATUS, self.NOT_USEFUL_STATUS)