crawler_hot_point.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. import traceback
  5. from typing import Dict, List, Tuple
  6. from tqdm.asyncio import tqdm
  7. from applications.api import fetch_deepseek_completion
  8. from applications.crawler.tophub import get_hot_point_content
  9. class CrawlerHotPointConst:
  10. MAX_PAGE_INDEX = 40
  11. INIT_STATUS = 0
  12. PROCESSING_STATUS = 1
  13. USEFUL_STATUS = 2
  14. NOT_USEFUL_STATUS = 3
  15. FAILED_STATUS = 99
  16. NOT_EXPIRED_STATUS = 1
  17. EXPIRED_STATUS = 2
  18. # batch
  19. PROCESS_TITLE_BATCH_SIZE = 200
  20. # ignore platforms
  21. IGNORE_PLATFORMS = {
  22. "中国日报", "每日珠宝杂志", "iBag包包", "ZAKER", "NASA 🌍", "wikiHow 中文",
  23. "China Daily", "微信 ‧ 游戏", "Yahoo News", "北京天文馆", "本地宝", "BBC"
  24. }
  25. class CrawlerHotPointBase(CrawlerHotPointConst):
  26. CLASSIFY_PROMPT = """
  27. # 角色
  28. 你是一名**银发内容分析助手**,从“热榜标题”中识别**55岁以上人群**(以下简称“老年群体”)**高度相关且具时效性的新闻**。
  29. ---
  30. ## 受众与主题边界
  31. ### ✅ 优先主题(示例)
  32. - 健康与疾病(慢性病、体检、用药与不良反应、流行病预警)
  33. - 养老与退休生活(养老院/社区养老、照护、适老化改造)
  34. - 社会保障(养老金、医保、社保政策解读与变动)
  35. - 代际关系与家庭(子女赡养、祖辈照护、家庭矛盾化解)
  36. - 老年安全(诈骗预警、交通/居家安全、金融风险)
  37. - 名人与社会事件(名人离世/健康突发、涉及老年群体的公共事件)
  38. - 轻松话题(奇闻趣事、正能量故事)——**仅在具备明显大众传播度或话题性时**纳入
  39. ### ❌ 硬性排除
  40. - 涉及**政治、当代国家领导人**、涉敏议题的内容
  41. - **地方性/行业性**的垂直资讯(如县域通知、专业圈层动态)除非**已引发全国性关注**
  42. - 明显软文、广告、纯知识科普/生活技巧(非新闻、非事件)
  43. - 低可信度谣言、医疗偏方、夸大疗效信息
  44. ---
  45. ## “爆炸性新闻”判定(需同时满足 2 条及以上)
  46. 1. **突发性**:短时间内发生/曝光(如猝发事故、紧急通报、名人健康/离世)
  47. 2. **广泛影响**:对全国/大范围群体或老年群体利益有实质影响(如养老金发放异常/系统性风险)
  48. 3. **强关注度**:社会各界/主流媒体/权威机构广泛讨论或发声
  49. 4. **信息强度**:事实明确、细节冲击性强(非捕风捉影)
  50. ---
  51. ## 输入格式
  52. - 你将收到若干条记录,每条包含:`id`(整数或字符串)、`title`(标题,字符串)。
  53. - 标题可能包含噪声或修辞,请基于**标题文字本身**进行判断。
  54. ---
  55. ## 决策流程(严格按序执行)
  56. 1. **敏感过滤**:若标题含政治/当代国家领导人等**敏感**要素 ⇒ **淘汰**
  57. 2. **相关性评估(老年群体)**:
  58. - 直接相关关键词(“养老金/医保/养老院/老人/阿尔茨海默/骨折/退役/退休”等) ⇒ 强相关
  59. - 间接相关但**明显触达老年痛点/关注点**(“养老诈骗”“养老院安全事故”“名人离世/老年疾病”) ⇒ 中到强相关
  60. - 仅猎奇但**无老年相关指向** ⇒ 淘汰
  61. 3. **时效性与“爆炸性”判定**:
  62. - 需符合“爆炸性新闻”判定中≥2项;
  63. - 纯常识/科普/日常提示无“事件性” ⇒ 淘汰
  64. 4. **全国性/大众化筛**:
  65. - 明显仅限**地方部门内部通知/小范围** ⇒ 淘汰(除非引发全国性关注)
  66. 5. **最终选择规则**:
  67. - 同一事件多标题重复,**只保留更具体、信息密度更高**的那条
  68. - 若无法明确其是否适合老年群体或是否“爆炸性”,**从严不选**
  69. ---
  70. ## 打分与阈值(用于自检,不输出分数)
  71. | 维度 | 分值 | 判定标准 |
  72. |------|------|-----------|
  73. | 相关性 | 0–3 | 直接老年主题=3;间接但明显相关=2;弱相关=1;无关=0 |
  74. | 爆炸性 | 0–2 | 满足条目数 0–2 |
  75. | 覆盖面 | 0–1 | 全国性/大众化=1;地方垂直=0 |
  76. **入选条件**:相关性 ≥ 2 且 爆炸性 ≥ 1 且 覆盖面 ≥ 0.5
  77. ---
  78. ## 输出
  79. 仅输出 JSON,**不得包含任何解释或多余文本**:
  80. ```json
  81. {
  82. "IDS": [1, 2, 3]
  83. }
  84. """
  85. FAMOUS_PERSON_PROMPT = """
  86. ## 输入格式
  87. - 你将收到若干条记录,每条包含:`id`(整数或字符串)、`title`(标题,字符串)。
  88. - 标题可能包含噪声或修辞,请基于**标题文字本身**进行判断。
  89. ## 任务说明
  90. - 请从输入的标题中,找出极高的**名人**热度的标题,返回其id列表;
  91. - 注意一定得是名人, 不要出现娱乐明星;
  92. - 注意一定得是老年人感兴趣的名人事件。
  93. please think step by step
  94. ## 输出
  95. 仅输出 JSON,**不得包含任何解释或多余文本**:
  96. ```json
  97. {
  98. "IDS": [1, 2, 3]
  99. }
  100. """
  101. @staticmethod
  102. def format_input_articles(fetch_response: List[Dict]) -> str:
  103. """
  104. 格式化输入文章为字符串,每个文章占一行,格式为:id, title
  105. """
  106. output_string = ""
  107. for item in fetch_response:
  108. output_string += f"{item['id']}, {item['title']}\n"
  109. return output_string
  110. class CrawlerHotPointMapper(CrawlerHotPointBase):
  111. def __init__(self, pool, log_client, trace_id):
  112. self.pool = pool
  113. self.log_client = log_client
  114. self.trace_id = trace_id
  115. async def save_articles(self, articles: List[Tuple]) -> int:
  116. """插入标题 && Link"""
  117. query = """
  118. INSERT IGNORE INTO hot_point_titles
  119. (title, platform, link)
  120. VALUES (%s, %s, %s);
  121. """
  122. return await self.pool.async_save(query=query, params=articles, batch=True)
  123. async def update_useful_status(
  124. self, article_id: int, origin_status: int, new_status: int
  125. ) -> int:
  126. """
  127. 更新文章状态
  128. """
  129. query = """
  130. UPDATE hot_point_titles
  131. SET useful = %s
  132. WHERE id = %s AND useful = %s;
  133. """
  134. return await self.pool.async_save(
  135. query=query, params=(new_status, article_id, origin_status)
  136. )
  137. async def set_as_processing(self, title_ids: List[int]) -> int:
  138. query = """
  139. UPDATE hot_point_titles
  140. SET useful = %s
  141. WHERE id IN %s;"""
  142. return await self.pool.async_save(
  143. query=query, params=(self.PROCESSING_STATUS, tuple(title_ids))
  144. )
  145. async def set_as_failed(self, title_ids: List[int]) -> int:
  146. """
  147. 设置文章为失败
  148. """
  149. query = """
  150. UPDATE hot_point_titles
  151. SET useful = %s
  152. WHERE id IN %s;
  153. """
  154. return await self.pool.async_save(
  155. query=query, params=(self.FAILED_STATUS, tuple(title_ids))
  156. )
  157. async def set_as_expired(self, article_id: int) -> int:
  158. """
  159. 设置文章为过期
  160. """
  161. query = """
  162. UPDATE hot_point_titles
  163. SET status = %s
  164. WHERE id = %s;
  165. """
  166. return await self.pool.async_save(
  167. query=query, params=(self.EXPIRED_STATUS, article_id)
  168. )
  169. async def fetch_init_articles(self) -> List[Dict]:
  170. """
  171. 获取未经过 LLM 判处处理的事件
  172. """
  173. query = """
  174. SELECT id, title FROM hot_point_titles WHERE status = %s AND useful = %s
  175. ORDER BY id Limit %s;
  176. """
  177. return await self.pool.async_fetch(query=query, params=(self.NOT_EXPIRED_STATUS, self.INIT_STATUS, self.PROCESS_TITLE_BATCH_SIZE))
  178. class CrawlerHotPointTask(CrawlerHotPointMapper):
  179. def __init__(self, pool, log_client, trace_id):
  180. super().__init__(pool, log_client, trace_id)
  181. def process_raw_data(self, response_data):
  182. """
  183. 处理原始数据
  184. """
  185. articles = []
  186. for item in response_data['data']['data']:
  187. platform = item["source"]
  188. if platform in self.IGNORE_PLATFORMS:
  189. continue
  190. for article in item["rankList"][:5]:
  191. title = article["title"]
  192. link = article["link"]
  193. articles.append((title, platform, link))
  194. return articles
  195. async def crawl_hot_titles(self):
  196. """
  197. 爬取热点标题
  198. """
  199. for page in tqdm(range(1, self.MAX_PAGE_INDEX)):
  200. try:
  201. raw_data = await get_hot_point_content(page_index=page)
  202. articles = self.process_raw_data(raw_data)
  203. await self.save_articles(articles)
  204. except Exception as e:
  205. print(f"crawl_hot_titles error: {e}")
  206. async def classify_articles_by_llm(self):
  207. """
  208. 用大模型进行分类,判断热点事件是否符合老年人的兴趣爱好
  209. """
  210. infos = await self.fetch_init_articles()
  211. # acquire lock
  212. if not infos:
  213. return
  214. title_ids = [item["id"] for item in infos]
  215. await self.set_as_processing(title_ids)
  216. # prompt = f"{self.CLASSIFY_PROMPT}\n{self.format_input_articles(infos)}"
  217. prompt = f"{self.FAMOUS_PERSON_PROMPT}\n{self.format_input_articles(infos)}"
  218. response = fetch_deepseek_completion(
  219. prompt=prompt, model="DeepSeek-R1", output_type="json"
  220. )
  221. if not response:
  222. w = await self.set_as_failed([item["id"] for item in infos])
  223. print(w)
  224. return
  225. ids = set(response.get("IDS", []))
  226. for item in tqdm(infos):
  227. id_ = item["id"]
  228. if id_ in ids:
  229. await self.update_useful_status(id_, self.PROCESSING_STATUS, self.USEFUL_STATUS)
  230. else:
  231. await self.update_useful_status(id_, self.PROCESSING_STATUS, self.NOT_USEFUL_STATUS)