| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- from __future__ import annotations
- import asyncio
- import json
- import traceback
- from typing import Dict, List, Tuple
- from tqdm.asyncio import tqdm
- from applications.api import fetch_deepseek_completion
- from applications.crawler.tophub import get_hot_point_content
- class CrawlerHotPointConst:
- MAX_PAGE_INDEX = 40
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- USEFUL_STATUS = 2
- NOT_USEFUL_STATUS = 3
- FAILED_STATUS = 99
- NOT_EXPIRED_STATUS = 1
- EXPIRED_STATUS = 2
- # batch
- PROCESS_TITLE_BATCH_SIZE = 200
- # ignore platforms
- IGNORE_PLATFORMS = {
- "中国日报", "每日珠宝杂志", "iBag包包", "ZAKER", "NASA 🌍", "wikiHow 中文",
- "China Daily", "微信 ‧ 游戏", "Yahoo News", "北京天文馆", "本地宝", "BBC"
- }
- class CrawlerHotPointBase(CrawlerHotPointConst):
- CLASSIFY_PROMPT = """
- # 角色
- 你是一名**银发内容分析助手**,从“热榜标题”中识别**55岁以上人群**(以下简称“老年群体”)**高度相关且具时效性的新闻**。
- ---
- ## 受众与主题边界
- ### ✅ 优先主题(示例)
- - 健康与疾病(慢性病、体检、用药与不良反应、流行病预警)
- - 养老与退休生活(养老院/社区养老、照护、适老化改造)
- - 社会保障(养老金、医保、社保政策解读与变动)
- - 代际关系与家庭(子女赡养、祖辈照护、家庭矛盾化解)
- - 老年安全(诈骗预警、交通/居家安全、金融风险)
- - 名人与社会事件(名人离世/健康突发、涉及老年群体的公共事件)
- - 轻松话题(奇闻趣事、正能量故事)——**仅在具备明显大众传播度或话题性时**纳入
- ### ❌ 硬性排除
- - 涉及**政治、当代国家领导人**、涉敏议题的内容
- - **地方性/行业性**的垂直资讯(如县域通知、专业圈层动态)除非**已引发全国性关注**
- - 明显软文、广告、纯知识科普/生活技巧(非新闻、非事件)
- - 低可信度谣言、医疗偏方、夸大疗效信息
- ---
- ## “爆炸性新闻”判定(需同时满足 2 条及以上)
- 1. **突发性**:短时间内发生/曝光(如猝发事故、紧急通报、名人健康/离世)
- 2. **广泛影响**:对全国/大范围群体或老年群体利益有实质影响(如养老金发放异常/系统性风险)
- 3. **强关注度**:社会各界/主流媒体/权威机构广泛讨论或发声
- 4. **信息强度**:事实明确、细节冲击性强(非捕风捉影)
- ---
- ## 输入格式
- - 你将收到若干条记录,每条包含:`id`(整数或字符串)、`title`(标题,字符串)。
- - 标题可能包含噪声或修辞,请基于**标题文字本身**进行判断。
- ---
- ## 决策流程(严格按序执行)
- 1. **敏感过滤**:若标题含政治/当代国家领导人等**敏感**要素 ⇒ **淘汰**
- 2. **相关性评估(老年群体)**:
- - 直接相关关键词(“养老金/医保/养老院/老人/阿尔茨海默/骨折/退役/退休”等) ⇒ 强相关
- - 间接相关但**明显触达老年痛点/关注点**(“养老诈骗”“养老院安全事故”“名人离世/老年疾病”) ⇒ 中到强相关
- - 仅猎奇但**无老年相关指向** ⇒ 淘汰
- 3. **时效性与“爆炸性”判定**:
- - 需符合“爆炸性新闻”判定中≥2项;
- - 纯常识/科普/日常提示无“事件性” ⇒ 淘汰
- 4. **全国性/大众化筛**:
- - 明显仅限**地方部门内部通知/小范围** ⇒ 淘汰(除非引发全国性关注)
- 5. **最终选择规则**:
- - 同一事件多标题重复,**只保留更具体、信息密度更高**的那条
- - 若无法明确其是否适合老年群体或是否“爆炸性”,**从严不选**
- ---
- ## 打分与阈值(用于自检,不输出分数)
- | 维度 | 分值 | 判定标准 |
- |------|------|-----------|
- | 相关性 | 0–3 | 直接老年主题=3;间接但明显相关=2;弱相关=1;无关=0 |
- | 爆炸性 | 0–2 | 满足条目数 0–2 |
- | 覆盖面 | 0–1 | 全国性/大众化=1;地方垂直=0 |
- **入选条件**:相关性 ≥ 2 且 爆炸性 ≥ 1 且 覆盖面 ≥ 0.5
- ---
- ## 输出
- 仅输出 JSON,**不得包含任何解释或多余文本**:
- ```json
- {
- "IDS": [1, 2, 3]
- }
- """
- FAMOUS_PERSON_PROMPT = """
- ## 输入格式
- - 你将收到若干条记录,每条包含:`id`(整数或字符串)、`title`(标题,字符串)。
- - 标题可能包含噪声或修辞,请基于**标题文字本身**进行判断。
- ## 任务说明
- - 请从输入的标题中,找出极高的**名人**热度的标题,返回其id列表;
- - 注意一定得是名人, 不要出现娱乐明星;
- - 注意一定得是老年人感兴趣的名人事件。
- please think step by step
- ## 输出
- 仅输出 JSON,**不得包含任何解释或多余文本**:
- ```json
- {
- "IDS": [1, 2, 3]
- }
- """
- @staticmethod
- def format_input_articles(fetch_response: List[Dict]) -> str:
- """
- 格式化输入文章为字符串,每个文章占一行,格式为:id, title
- """
- output_string = ""
- for item in fetch_response:
- output_string += f"{item['id']}, {item['title']}\n"
- return output_string
- class CrawlerHotPointMapper(CrawlerHotPointBase):
- def __init__(self, pool, log_client, trace_id):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- async def save_articles(self, articles: List[Tuple]) -> int:
- """插入标题 && Link"""
- query = """
- INSERT IGNORE INTO hot_point_titles
- (title, platform, link)
- VALUES (%s, %s, %s);
- """
- return await self.pool.async_save(query=query, params=articles, batch=True)
- async def update_useful_status(
- self, article_id: int, origin_status: int, new_status: int
- ) -> int:
- """
- 更新文章状态
- """
- query = """
- UPDATE hot_point_titles
- SET useful = %s
- WHERE id = %s AND useful = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, article_id, origin_status)
- )
- async def set_as_processing(self, title_ids: List[int]) -> int:
- query = """
- UPDATE hot_point_titles
- SET useful = %s
- WHERE id IN %s;"""
- return await self.pool.async_save(
- query=query, params=(self.PROCESSING_STATUS, tuple(title_ids))
- )
- async def set_as_failed(self, title_ids: List[int]) -> int:
- """
- 设置文章为失败
- """
- query = """
- UPDATE hot_point_titles
- SET useful = %s
- WHERE id IN %s;
- """
- return await self.pool.async_save(
- query=query, params=(self.FAILED_STATUS, tuple(title_ids))
- )
- async def set_as_expired(self, article_id: int) -> int:
- """
- 设置文章为过期
- """
- query = """
- UPDATE hot_point_titles
- SET status = %s
- WHERE id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(self.EXPIRED_STATUS, article_id)
- )
- async def fetch_init_articles(self) -> List[Dict]:
- """
- 获取未经过 LLM 判处处理的事件
- """
- query = """
- SELECT id, title FROM hot_point_titles WHERE status = %s AND useful = %s
- ORDER BY id Limit %s;
- """
- return await self.pool.async_fetch(query=query, params=(self.NOT_EXPIRED_STATUS, self.INIT_STATUS, self.PROCESS_TITLE_BATCH_SIZE))
- class CrawlerHotPointTask(CrawlerHotPointMapper):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- def process_raw_data(self, response_data):
- """
- 处理原始数据
- """
- articles = []
- for item in response_data['data']['data']:
- platform = item["source"]
- if platform in self.IGNORE_PLATFORMS:
- continue
- for article in item["rankList"][:5]:
- title = article["title"]
- link = article["link"]
- articles.append((title, platform, link))
- return articles
- async def crawl_hot_titles(self):
- """
- 爬取热点标题
- """
- for page in tqdm(range(1, self.MAX_PAGE_INDEX)):
- try:
- raw_data = await get_hot_point_content(page_index=page)
- articles = self.process_raw_data(raw_data)
- await self.save_articles(articles)
- except Exception as e:
- print(f"crawl_hot_titles error: {e}")
- async def classify_articles_by_llm(self):
- """
- 用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好
- """
- infos = await self.fetch_init_articles()
- # acquire lock
- if not infos:
- return
- title_ids = [item["id"] for item in infos]
- await self.set_as_processing(title_ids)
- # prompt = f"{self.CLASSIFY_PROMPT}\n{self.format_input_articles(infos)}"
- prompt = f"{self.FAMOUS_PERSON_PROMPT}\n{self.format_input_articles(infos)}"
- response = fetch_deepseek_completion(
- prompt=prompt, model="DeepSeek-R1", output_type="json"
- )
- if not response:
- w = await self.set_as_failed([item["id"] for item in infos])
- print(w)
- return
- ids = set(response.get("IDS", []))
- for item in tqdm(infos):
- id_ = item["id"]
- if id_ in ids:
- await self.update_useful_status(id_, self.PROCESSING_STATUS, self.USEFUL_STATUS)
- else:
- await self.update_useful_status(id_, self.PROCESSING_STATUS, self.NOT_USEFUL_STATUS)
|