from __future__ import annotations import asyncio import json import traceback from typing import Dict, List, Tuple from tqdm.asyncio import tqdm from applications.api import fetch_deepseek_completion from applications.crawler.tophub import get_hot_point_content class CrawlerHotPointConst: MAX_PAGE_INDEX = 40 INIT_STATUS = 0 PROCESSING_STATUS = 1 USEFUL_STATUS = 2 NOT_USEFUL_STATUS = 3 FAILED_STATUS = 99 NOT_EXPIRED_STATUS = 1 EXPIRED_STATUS = 2 # batch PROCESS_TITLE_BATCH_SIZE = 200 # ignore platforms IGNORE_PLATFORMS = { "中国日报", "每日珠宝杂志", "iBag包包", "ZAKER", "NASA 🌍", "wikiHow 中文", "China Daily", "微信 ‧ 游戏", "Yahoo News", "北京天文馆", "本地宝", "BBC" } class CrawlerHotPointBase(CrawlerHotPointConst): CLASSIFY_PROMPT = """ # 角色 你是一名**银发内容分析助手**,从“热榜标题”中识别**55岁以上人群**(以下简称“老年群体”)**高度相关且具时效性的新闻**。 --- ## 受众与主题边界 ### ✅ 优先主题(示例) - 健康与疾病(慢性病、体检、用药与不良反应、流行病预警) - 养老与退休生活(养老院/社区养老、照护、适老化改造) - 社会保障(养老金、医保、社保政策解读与变动) - 代际关系与家庭(子女赡养、祖辈照护、家庭矛盾化解) - 老年安全(诈骗预警、交通/居家安全、金融风险) - 名人与社会事件(名人离世/健康突发、涉及老年群体的公共事件) - 轻松话题(奇闻趣事、正能量故事)——**仅在具备明显大众传播度或话题性时**纳入 ### ❌ 硬性排除 - 涉及**政治、当代国家领导人**、涉敏议题的内容 - **地方性/行业性**的垂直资讯(如县域通知、专业圈层动态)除非**已引发全国性关注** - 明显软文、广告、纯知识科普/生活技巧(非新闻、非事件) - 低可信度谣言、医疗偏方、夸大疗效信息 --- ## “爆炸性新闻”判定(需同时满足 2 条及以上) 1. **突发性**:短时间内发生/曝光(如猝发事故、紧急通报、名人健康/离世) 2. **广泛影响**:对全国/大范围群体或老年群体利益有实质影响(如养老金发放异常/系统性风险) 3. **强关注度**:社会各界/主流媒体/权威机构广泛讨论或发声 4. **信息强度**:事实明确、细节冲击性强(非捕风捉影) --- ## 输入格式 - 你将收到若干条记录,每条包含:`id`(整数或字符串)、`title`(标题,字符串)。 - 标题可能包含噪声或修辞,请基于**标题文字本身**进行判断。 --- ## 决策流程(严格按序执行) 1. **敏感过滤**:若标题含政治/当代国家领导人等**敏感**要素 ⇒ **淘汰** 2. **相关性评估(老年群体)**: - 直接相关关键词(“养老金/医保/养老院/老人/阿尔茨海默/骨折/退役/退休”等) ⇒ 强相关 - 间接相关但**明显触达老年痛点/关注点**(“养老诈骗”“养老院安全事故”“名人离世/老年疾病”) ⇒ 中到强相关 - 仅猎奇但**无老年相关指向** ⇒ 淘汰 3. **时效性与“爆炸性”判定**: - 需符合“爆炸性新闻”判定中≥2项; - 纯常识/科普/日常提示无“事件性” ⇒ 淘汰 4. **全国性/大众化筛**: - 明显仅限**地方部门内部通知/小范围** ⇒ 淘汰(除非引发全国性关注) 5. **最终选择规则**: - 同一事件多标题重复,**只保留更具体、信息密度更高**的那条 - 若无法明确其是否适合老年群体或是否“爆炸性”,**从严不选** --- ## 打分与阈值(用于自检,不输出分数) | 维度 | 分值 | 判定标准 | |------|------|-----------| | 相关性 | 0–3 | 直接老年主题=3;间接但明显相关=2;弱相关=1;无关=0 | | 爆炸性 | 0–2 | 满足条目数 0–2 | | 覆盖面 | 0–1 | 全国性/大众化=1;地方垂直=0 | **入选条件**:相关性 ≥ 2 且 爆炸性 ≥ 1 且 覆盖面 ≥ 0.5 --- ## 输出 仅输出 JSON,**不得包含任何解释或多余文本**: ```json { "IDS": [1, 2, 3] } """ FAMOUS_PERSON_PROMPT = """ ## 输入格式 - 你将收到若干条记录,每条包含:`id`(整数或字符串)、`title`(标题,字符串)。 - 标题可能包含噪声或修辞,请基于**标题文字本身**进行判断。 ## 任务说明 - 请从输入的标题中,找出极高的**名人**热度的标题,返回其id列表; - 注意一定得是名人。 - 注意一定得是老年人感兴趣的名人。 ## 输出 仅输出 JSON,**不得包含任何解释或多余文本**: ```json { "IDS": [1, 2, 3] } """ @staticmethod def format_input_articles(fetch_response: List[Dict]) -> str: """ 格式化输入文章为字符串,每个文章占一行,格式为:id, title """ output_string = "" for item in fetch_response: output_string += f"{item['id']}, {item['title']}\n" return output_string class CrawlerHotPointMapper(CrawlerHotPointBase): def __init__(self, pool, log_client, trace_id): self.pool = pool self.log_client = log_client self.trace_id = trace_id async def save_articles(self, articles: List[Tuple]) -> int: """插入标题 && Link""" query = """ INSERT IGNORE INTO hot_point_titles (title, platform, link) VALUES (%s, %s, %s); """ return await self.pool.async_save(query=query, params=articles, batch=True) async def update_useful_status( self, article_id: int, origin_status: int, new_status: int ) -> int: """ 更新文章状态 """ query = """ UPDATE hot_point_titles SET useful = %s WHERE id = %s AND useful = %s; """ return await self.pool.async_save( query=query, params=(new_status, article_id, origin_status) ) async def set_as_processing(self, title_ids: List[int]) -> int: query = """ UPDATE hot_point_titles SET useful = %s WHERE id IN %s;""" return await self.pool.async_save( query=query, params=(self.PROCESSING_STATUS, tuple(title_ids)) ) async def set_as_failed(self, title_ids: List[int]) -> int: """ 设置文章为失败 """ query = """ UPDATE hot_point_titles SET useful = %s WHERE id IN %s; """ return await self.pool.async_save( query=query, params=(self.FAILED_STATUS, tuple(title_ids)) ) async def set_as_expired(self, article_id: int) -> int: """ 设置文章为过期 """ query = """ UPDATE hot_point_titles SET status = %s WHERE id = %s; """ return await self.pool.async_save( query=query, params=(self.EXPIRED_STATUS, article_id) ) async def fetch_init_articles(self) -> List[Dict]: """ 获取未经过 LLM 判处处理的事件 """ query = """ SELECT id, title FROM hot_point_titles WHERE status = %s AND useful = %s ORDER BY id Limit %s; """ return await self.pool.async_fetch(query=query, params=(self.NOT_EXPIRED_STATUS, self.INIT_STATUS, self.PROCESS_TITLE_BATCH_SIZE)) class CrawlerHotPointTask(CrawlerHotPointMapper): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client, trace_id) def process_raw_data(self, response_data): """ 处理原始数据 """ articles = [] for item in response_data['data']['data']: platform = item["source"] if platform in self.IGNORE_PLATFORMS: continue for article in item["rankList"]: title = article["title"] link = article["link"] articles.append((title, platform, link)) return articles async def crawl_hot_titles(self): """ 爬取热点标题 """ for page in tqdm(range(1, self.MAX_PAGE_INDEX)): try: raw_data = await get_hot_point_content(page_index=page) articles = self.process_raw_data(raw_data) await self.save_articles(articles) except Exception as e: print(f"crawl_hot_titles error: {e}") async def classify_articles_by_llm(self): """ 用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好 """ infos = await self.fetch_init_articles() # acquire lock if not infos: return title_ids = [item["id"] for item in infos] await self.set_as_processing(title_ids) # prompt = f"{self.CLASSIFY_PROMPT}\n{self.format_input_articles(infos)}" prompt = f"{self.FAMOUS_PERSON_PROMPT}\n{self.format_input_articles(infos)}" response = fetch_deepseek_completion( prompt=prompt, model="DeepSeek-V3", output_type="json" ) if not response: w = await self.set_as_failed([item["id"] for item in infos]) print(w) return ids = set(response.get("IDS", [])) for item in tqdm(infos): id_ = item["id"] if id_ in ids: await self.update_useful_status(id_, self.PROCESSING_STATUS, self.USEFUL_STATUS) else: await self.update_useful_status(id_, self.PROCESSING_STATUS, self.NOT_USEFUL_STATUS)