| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- from __future__ import annotations
- import asyncio
- import json
- import traceback
- from typing import Dict, List, Tuple
- from tqdm.asyncio import tqdm
- from applications.api import fetch_deepseek_completion
- from applications.crawler.tophub import get_hot_point_content
- class CrawlerHotPointConst:
- MAX_PAGE_INDEX = 40
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- USEFUL_STATUS = 2
- NOT_USEFUL_STATUS = 3
- FAILED_STATUS = 99
- NOT_EXPIRED_STATUS = 1
- EXPIRED_STATUS = 2
- # batch
- PROCESS_TITLE_BATCH_SIZE = 200
- # ignore platforms
- IGNORE_PLATFORMS = {
- "中国日报", "每日珠宝杂志", "iBag包包", "ZAKER", "NASA 🌍", "wikiHow 中文",
- "China Daily", "微信 ‧ 游戏", "Yahoo News", "北京天文馆", "本地宝"
- }
- class CrawlerHotPointBase(CrawlerHotPointConst):
- CLASSIFY_PROMPT = """
- 你是一个内容分析助手,专门从热榜标题中识别出55岁以上老年人可能喜欢或关注的银发内容。
- 银发内容通常涉及健康、养老、退休生活、老年疾病、社会保障、代际关系、奇闻趣事、名人故事、社会事件等主题。
- 不要出现政治,当代国家领导人等敏感事件,各个地方的垂直新闻信息
- 1. **任务描述**:
- 扫描所有标题,筛选出与银发内容高度相关时效性新闻信息。相关性判断基于标题是否直接或间接提及老年人相关话题,或可能吸引55岁以上人群的兴趣。返回适合的 id。
- 如果遇到敏感人物,正常过滤。请注意,一定要是爆炸性新闻事件, 事件会迅速引发社会各界关注。
- 请严格判断标题是否适合老年群体。
- 4. **输出格式**:输出结果为 JSON,只需要返回适合老年人话题的 id, 结构为
- {
- "IDS": [1, 2, 3, ...]
- }
- 现在, 请处理我输入的标题 && id, please think step by step.
- """
- @staticmethod
- def format_input_articles(fetch_response: List[Dict]) -> str:
- """
- 格式化输入文章为字符串,每个文章占一行,格式为:id, title
- """
- output_string = ""
- for item in fetch_response:
- output_string += f"{item['id']}, {item['title']}\n"
- return output_string
- class CrawlerHotPointMapper(CrawlerHotPointBase):
- def __init__(self, pool, log_client, trace_id):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- async def save_articles(self, articles: List[Tuple]) -> int:
- """插入标题 && Link"""
- query = """
- INSERT IGNORE INTO hot_point_titles
- (title, platform, link)
- VALUES (%s, %s, %s);
- """
- return await self.pool.async_save(query=query, params=articles, batch=True)
- async def update_useful_status(
- self, article_id: int, origin_status: int, new_status: int
- ) -> int:
- """
- 更新文章状态
- """
- query = """
- UPDATE hot_point_titles
- SET useful = %s
- WHERE id = %s AND useful = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, article_id, origin_status)
- )
- async def set_as_processing(self, title_ids: List[int]) -> int:
- query = """
- UPDATE hot_point_titles
- SET useful = %s
- WHERE id IN %s;"""
- return await self.pool.async_save(
- query=query, params=(self.PROCESSING_STATUS, tuple(title_ids))
- )
- async def set_as_failed(self, title_ids: List[int]) -> int:
- """
- 设置文章为失败
- """
- query = """
- UPDATE hot_point_titles
- SET useful = %s
- WHERE id IN %s;
- """
- return await self.pool.async_save(
- query=query, params=(self.FAILED_STATUS, tuple(title_ids))
- )
- async def set_as_expired(self, article_id: int) -> int:
- """
- 设置文章为过期
- """
- query = """
- UPDATE hot_point_titles
- SET status = %s
- WHERE id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(self.EXPIRED_STATUS, article_id)
- )
- async def fetch_init_articles(self) -> List[Dict]:
- """
- 获取未经过 LLM 判处处理的事件
- """
- query = """
- SELECT id, title FROM hot_point_titles WHERE status = %s AND useful = %s
- ORDER BY id Limit %s;
- """
- return await self.pool.async_fetch(query=query, params=(self.NOT_EXPIRED_STATUS, self.INIT_STATUS, self.PROCESS_TITLE_BATCH_SIZE))
- class CrawlerHotPointTask(CrawlerHotPointMapper):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- def process_raw_data(self, response_data):
- """
- 处理原始数据
- """
- articles = []
- for item in response_data['data']['data']:
- platform = item["source"]
- if platform in self.IGNORE_PLATFORMS:
- continue
- for article in item["rankList"]:
- title = article["title"]
- link = article["link"]
- articles.append((title, platform, link))
- return articles
- async def crawl_hot_titles(self):
- """
- 爬取热点标题
- """
- for page in tqdm(range(1, self.MAX_PAGE_INDEX)):
- try:
- raw_data = await get_hot_point_content(page_index=page)
- articles = self.process_raw_data(raw_data)
- await self.save_articles(articles)
- except Exception as e:
- print(f"crawl_hot_titles error: {e}")
- async def classify_articles_by_llm(self):
- """
- 用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好
- """
- infos = await self.fetch_init_articles()
- # acquire lock
- if not infos:
- return
- title_ids = [item["id"] for item in infos]
- await self.set_as_processing(title_ids)
- prompt = f"{self.CLASSIFY_PROMPT}\n{self.format_input_articles(infos)}"
- response = fetch_deepseek_completion(
- prompt=prompt, model="DeepSeek-R1", output_type="json"
- )
- if not response:
- w = await self.set_as_failed([item["id"] for item in infos])
- print(w)
- return
- ids = set(response.get("IDS", []))
- for item in tqdm(infos):
- id_ = item["id"]
- if id_ in ids:
- await self.update_useful_status(id_, self.PROCESSING_STATUS, self.USEFUL_STATUS)
- else:
- await self.update_useful_status(id_, self.PROCESSING_STATUS, self.NOT_USEFUL_STATUS)
|