crawler_hot_point.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. import traceback
  5. from typing import Dict, List, Tuple
  6. from tqdm.asyncio import tqdm
  7. from applications.api import fetch_deepseek_completion
  8. from applications.crawler.tophub import get_hot_point_content
  9. class CrawlerHotPointConst:
  10. MAX_PAGE_INDEX = 40
  11. INIT_STATUS = 0
  12. PROCESSING_STATUS = 1
  13. USEFUL_STATUS = 2
  14. UNUSEFUL_STATUS = 3
  15. FAILED_STATUS = 99
  16. NOT_EXPIRED_STATUS = 1
  17. EXPIRED_STATUS = 2
  18. # batch
  19. PROCESS_TITLE_BATCH_SIZE = 300
  20. # ignore platforms
  21. IGNORE_PLATFORMS = {
  22. "中国日报", "每日珠宝杂志", "iBag包包", "ZAKER", "NASA 🌍", "wikiHow 中文",
  23. "China Daily", "微信 ‧ 游戏", "Yahoo News"
  24. }
  25. class CrawlerHotPointMapper(CrawlerHotPointConst):
  26. def __init__(self, pool, log_client, trace_id):
  27. self.pool = pool
  28. self.log_client = log_client
  29. self.trace_id = trace_id
  30. async def save_articles(self, articles: List[Tuple]) -> int:
  31. """插入标题 && Link"""
  32. query = """
  33. INSERT IGNORE INTO hot_point_titles
  34. (title, platform, link)
  35. VALUES (%s, %s, %s);
  36. """
  37. return await self.pool.async_save(query=query, params=articles, batch=True)
  38. async def update_useful_status(
  39. self, article_id: int, origin_status: int, new_status: int
  40. ) -> int:
  41. """
  42. 更新文章状态
  43. """
  44. query = """
  45. UPDATE hot_point_titles
  46. SET useful = %s
  47. WHERE id = %s AND useful = %s;
  48. """
  49. return await self.pool.async_save(
  50. query=query, params=(new_status, article_id, origin_status)
  51. )
  52. async def set_as_expired(self, article_id: int) -> int:
  53. """
  54. 设置文章为过期
  55. """
  56. query = """
  57. UPDATE hot_point_titles
  58. SET status = %s
  59. WHERE id = %s;
  60. """
  61. return await self.pool.async_save(
  62. query=query, params=(self.EXPIRED_STATUS, article_id)
  63. )
  64. async def fetch_init_articles(self) -> List[Dict]:
  65. """
  66. 获取未经过 LLM 判处处理的事件
  67. """
  68. query = """
  69. SELECT id, title FROM hot_point_titles WHERE status = %s AND useful = %s
  70. ORDER BY id Limit %s;
  71. """
  72. return await self.pool.async_fetch(query=query, params=(self.NOT_EXPIRED_STATUS, self.INIT_STATUS, self.PROCESS_TITLE_BATCH_SIZE))
  73. class CrawlerHotPointTask(CrawlerHotPointMapper):
  74. CLASSIFY_PROMPT = """
  75. 你是一个内容分析助手,专门从热榜标题中识别出55岁以上老年人可能喜欢或关注的银发内容。
  76. 银发内容通常涉及健康、养老、退休生活、老年疾病、社会保障、代际关系、奇闻趣事、名人故事、社会事件等主题。
  77. 1. **任务描述**:
  78. 扫描所有标题,筛选出与银发内容高度相关时效性新闻信息。相关性判断基于标题是否直接或间接提及老年人相关话题,或可能吸引55岁以上人群的兴趣。返回适合的 id。
  79. 如果遇到敏感人物,正常过滤
  80. 请注意,一定要是新闻性事件
  81. 4. **输出格式**:输出结果为 JSON,只需要返回适合老年人话题的 id, 结构为
  82. {
  83. "IDS": [1, 2, 3, ...]
  84. }
  85. 现在, 请处理我输入的标题 && id
  86. """
  87. def __init__(self, pool, log_client, trace_id):
  88. super().__init__(pool, log_client, trace_id)
  89. def process_raw_data(self, response_data):
  90. """
  91. 处理原始数据
  92. """
  93. articles = []
  94. for item in response_data['data']['data']:
  95. platform = item["source"]
  96. if platform in self.IGNORE_PLATFORMS:
  97. continue
  98. for article in item["rankList"]:
  99. title = article["title"]
  100. link = article["link"]
  101. articles.append((title, platform, link))
  102. return articles
  103. async def crawl_hot_titles(self):
  104. """
  105. 爬取热点标题
  106. """
  107. for page in tqdm(range(1, self.MAX_PAGE_INDEX)):
  108. try:
  109. raw_data = await get_hot_point_content(page_index=page)
  110. articles = self.process_raw_data(raw_data)
  111. await self.save_articles(articles)
  112. except Exception as e:
  113. print(f"crawl_hot_titles error: {e}")
  114. async def classify_articles_by_llm(self):
  115. """
  116. 用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好
  117. """
  118. infos = await self.fetch_init_articles()
  119. prompt = f"{self.CLASSIFY_PROMPT}\n{infos}"
  120. print(prompt)
  121. response = fetch_deepseek_completion(
  122. prompt=prompt, model="default", output_type="json"
  123. )
  124. print(response)