demand_quality.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. """需求质量判断:事件性、老年性 LLM 评分。
  2. 流程(串行两次 LLM,评分阶段互不截断):
  3. 1. 微信指数达标后,构建待评需求(特征点组合 + 有匹配的短语)
  4. 2. 对全部待评需求执行事件性 LLM 评分
  5. 3. 对同一批全部待评需求执行老年性 LLM 评分
  6. 4. 导出表 / ODPS 写入时再过滤:标题保留(微信指数 + 灵感/目的点匹配)+ 事件性、老年性双达标
  7. """
  8. from __future__ import annotations
  9. import json
  10. import re
  11. import time
  12. from typing import Any
  13. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  14. from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
  15. from app.hot_content.exceptions import HotContentFlowError
  16. TYPE_FEATURE_POINT = "特征点"
  17. TYPE_PHRASE = "短语"
  18. def _normalize_demand_key(value: str) -> str:
  19. return "".join(str(value or "").split())
  20. def _dedupe_texts(texts: list[str]) -> list[str]:
  21. deduped: list[str] = []
  22. seen: set[str] = set()
  23. for raw in texts:
  24. text = str(raw).strip()
  25. if not text:
  26. continue
  27. keys = {text, _normalize_demand_key(text)}
  28. if keys & seen:
  29. continue
  30. seen.update(keys)
  31. deduped.append(text)
  32. return deduped
  33. def _has_matched_demand(row: dict[str, Any]) -> bool:
  34. return bool(str(row.get("matched_demand") or "").strip())
  35. def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
  36. scores: list[float] = []
  37. for row in export_rows:
  38. try:
  39. scores.append(float(row.get("wxindex_latest_score") or 0))
  40. except (TypeError, ValueError):
  41. continue
  42. return max(scores) if scores else 0.0
  43. def passes_wxindex_gate(
  44. export_rows: list[dict[str, Any]],
  45. *,
  46. wxindex_threshold: float,
  47. ) -> bool:
  48. """记录级微信指数是否达标,用于决定是否进入质量判断。"""
  49. return _record_wxindex_score(export_rows) >= wxindex_threshold
  50. def _extract_json_object(text: str) -> dict[str, Any]:
  51. raw = text.strip()
  52. if raw.startswith("```"):
  53. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  54. raw = re.sub(r"\s*```$", "", raw)
  55. try:
  56. parsed = json.loads(raw)
  57. if isinstance(parsed, dict):
  58. return parsed
  59. except json.JSONDecodeError:
  60. pass
  61. match = re.search(r"\{[\s\S]*\}", raw)
  62. if not match:
  63. raise HotContentFlowError("llm output is not json object")
  64. try:
  65. parsed = json.loads(match.group(0))
  66. except json.JSONDecodeError as exc:
  67. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  68. if not isinstance(parsed, dict):
  69. raise HotContentFlowError("llm output is not json object")
  70. return parsed
  71. def _candidate_key(demand_type: str, demand_text: str) -> tuple[str, str]:
  72. return demand_type.strip(), _normalize_demand_key(demand_text)
  73. def build_feature_combo_text(export_rows: list[dict[str, Any]]) -> str:
  74. element_texts = _dedupe_texts(
  75. [
  76. str(row.get("item_text") or "").strip()
  77. for row in export_rows
  78. if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT
  79. and _has_matched_demand(row)
  80. ]
  81. )
  82. return " ".join(element_texts)
  83. def build_quality_candidates(
  84. export_rows: list[dict[str, Any]],
  85. *,
  86. wxindex_threshold: float,
  87. ) -> list[dict[str, str]]:
  88. """微信指数达标时,构建特征点组合与短语两类待评需求。"""
  89. if not passes_wxindex_gate(export_rows, wxindex_threshold=wxindex_threshold):
  90. return []
  91. candidates: list[dict[str, str]] = []
  92. seen: set[tuple[str, str]] = set()
  93. feature_combo = build_feature_combo_text(export_rows)
  94. if feature_combo:
  95. key = _candidate_key(TYPE_FEATURE_POINT, feature_combo)
  96. if key not in seen:
  97. seen.add(key)
  98. candidates.append(
  99. {
  100. "demand_type": TYPE_FEATURE_POINT,
  101. "demand_text": feature_combo,
  102. }
  103. )
  104. for row in export_rows:
  105. if str(row.get("item_type") or "") != ITEM_TYPE_PHRASE:
  106. continue
  107. if not _has_matched_demand(row):
  108. continue
  109. phrase_text = str(row.get("item_text") or "").strip()
  110. if not phrase_text:
  111. continue
  112. key = _candidate_key(TYPE_PHRASE, phrase_text)
  113. if key in seen:
  114. continue
  115. seen.add(key)
  116. candidates.append(
  117. {
  118. "demand_type": TYPE_PHRASE,
  119. "demand_text": phrase_text,
  120. }
  121. )
  122. return candidates
  123. def _normalize_score(value: Any) -> float | None:
  124. try:
  125. score = float(value)
  126. except (TypeError, ValueError):
  127. return None
  128. if score < 0:
  129. return 0.0
  130. if score > 10:
  131. return 10.0
  132. return score
  133. def _build_score_lookup(result_json: dict[str, Any] | None) -> dict[tuple[str, str], dict[str, Any]]:
  134. lookup: dict[tuple[str, str], dict[str, Any]] = {}
  135. if not isinstance(result_json, dict):
  136. return lookup
  137. items = result_json.get("items") or []
  138. if not isinstance(items, list):
  139. return lookup
  140. for item in items:
  141. if not isinstance(item, dict):
  142. continue
  143. demand_type = str(item.get("demand_type") or "").strip()
  144. demand_text = str(item.get("demand_text") or "").strip()
  145. if not demand_type or not demand_text:
  146. continue
  147. lookup[_candidate_key(demand_type, demand_text)] = item
  148. return lookup
  149. def lookup_quality_scores(
  150. *,
  151. demand_type: str,
  152. demand_text: str,
  153. event_sense_json: dict[str, Any] | None,
  154. senior_fit_json: dict[str, Any] | None,
  155. ) -> tuple[float | None, float | None]:
  156. key = _candidate_key(demand_type, demand_text)
  157. event_item = _build_score_lookup(event_sense_json).get(key)
  158. senior_item = _build_score_lookup(senior_fit_json).get(key)
  159. event_score = _normalize_score(event_item.get("score")) if event_item else None
  160. senior_score = _normalize_score(senior_item.get("score")) if senior_item else None
  161. return event_score, senior_score
  162. def quality_passed(
  163. *,
  164. demand_type: str,
  165. demand_text: str,
  166. event_sense_json: dict[str, Any] | None,
  167. senior_fit_json: dict[str, Any] | None,
  168. event_threshold: float,
  169. senior_threshold: float,
  170. ) -> bool:
  171. event_score, senior_score = lookup_quality_scores(
  172. demand_type=demand_type,
  173. demand_text=demand_text,
  174. event_sense_json=event_sense_json,
  175. senior_fit_json=senior_fit_json,
  176. )
  177. if event_score is None or senior_score is None:
  178. return False
  179. return event_score >= event_threshold and senior_score >= senior_threshold
  180. def attach_quality_scores_to_export_rows(
  181. export_rows: list[dict[str, Any]],
  182. *,
  183. event_sense_json: dict[str, Any] | None,
  184. senior_fit_json: dict[str, Any] | None,
  185. ) -> list[dict[str, Any]]:
  186. feature_combo = build_feature_combo_text(export_rows)
  187. rows: list[dict[str, Any]] = []
  188. for row in export_rows:
  189. item_type = str(row.get("item_type") or "")
  190. item_text = str(row.get("item_text") or "").strip()
  191. if item_type == ITEM_TYPE_ELEMENT and feature_combo:
  192. event_score, senior_score = lookup_quality_scores(
  193. demand_type=TYPE_FEATURE_POINT,
  194. demand_text=feature_combo,
  195. event_sense_json=event_sense_json,
  196. senior_fit_json=senior_fit_json,
  197. )
  198. elif item_type == ITEM_TYPE_PHRASE and item_text:
  199. event_score, senior_score = lookup_quality_scores(
  200. demand_type=TYPE_PHRASE,
  201. demand_text=item_text,
  202. event_sense_json=event_sense_json,
  203. senior_fit_json=senior_fit_json,
  204. )
  205. else:
  206. event_score, senior_score = None, None
  207. rows.append(
  208. {
  209. **row,
  210. "event_sense_score": event_score,
  211. "senior_fit_score": senior_score,
  212. }
  213. )
  214. return rows
  215. def _normalize_llm_items(
  216. parsed: dict[str, Any],
  217. candidates: list[dict[str, str]],
  218. ) -> list[dict[str, Any]]:
  219. candidate_lookup = {
  220. _candidate_key(item["demand_type"], item["demand_text"]): item
  221. for item in candidates
  222. }
  223. raw_items = parsed.get("items") or []
  224. if not isinstance(raw_items, list):
  225. raw_items = []
  226. items: list[dict[str, Any]] = []
  227. seen: set[tuple[str, str]] = set()
  228. for raw in raw_items:
  229. if not isinstance(raw, dict):
  230. continue
  231. demand_type = str(raw.get("demand_type") or "").strip()
  232. demand_text = str(raw.get("demand_text") or "").strip()
  233. if not demand_type or not demand_text:
  234. continue
  235. key = _candidate_key(demand_type, demand_text)
  236. if key not in candidate_lookup or key in seen:
  237. continue
  238. seen.add(key)
  239. score = _normalize_score(raw.get("score"))
  240. if score is None:
  241. continue
  242. items.append(
  243. {
  244. "demand_type": demand_type,
  245. "demand_text": demand_text,
  246. "score": score,
  247. "reason": str(raw.get("reason") or "").strip(),
  248. }
  249. )
  250. return items
  251. def _llm_score_demands(
  252. *,
  253. channel_content_id: str,
  254. candidates: list[dict[str, str]],
  255. system_prompt: str,
  256. model: str,
  257. max_attempts: int,
  258. retry_sleep_seconds: float,
  259. max_tokens: int,
  260. score_field: str,
  261. ) -> dict[str, Any]:
  262. if not candidates:
  263. return {"source": channel_content_id, "items": []}
  264. user_payload = {
  265. "source": channel_content_id,
  266. "demands": candidates,
  267. "output_schema": {
  268. "source": "string",
  269. "items": [
  270. {
  271. "demand_type": "string, 特征点 or 短语",
  272. "demand_text": "string, must match one demand in demands",
  273. "score": "number, 0-10",
  274. "reason": "string",
  275. }
  276. ],
  277. },
  278. "constraints": [
  279. "仅对给定 demands 逐项评分,不得新增或遗漏",
  280. "score 为 0-10 的数字,越大表示越符合判断标准",
  281. "demand_type 与 demand_text 必须与输入完全一致",
  282. "仅输出 JSON 对象,不要 markdown 代码块",
  283. ],
  284. }
  285. last_error: Exception | None = None
  286. for attempt in range(1, max(max_attempts, 1) + 1):
  287. try:
  288. resp = create_chat_completion(
  289. [
  290. {"role": "system", "content": system_prompt},
  291. {
  292. "role": "user",
  293. "content": json.dumps(user_payload, ensure_ascii=False),
  294. },
  295. ],
  296. model=model or None,
  297. temperature=0,
  298. max_tokens=max(max_tokens, 1),
  299. )
  300. parsed = _extract_json_object(str(resp.get("content") or ""))
  301. parsed.setdefault("source", channel_content_id)
  302. items = _normalize_llm_items(parsed, candidates)
  303. return {
  304. "source": channel_content_id,
  305. "score_field": score_field,
  306. "items": items,
  307. }
  308. except (OpenRouterCallError, HotContentFlowError) as exc:
  309. last_error = exc
  310. if attempt < max(max_attempts, 1):
  311. time.sleep(max(retry_sleep_seconds, 0))
  312. raise HotContentFlowError(
  313. f"llm {score_field} scoring failed for {channel_content_id}: {last_error}"
  314. ) from last_error
  315. def llm_score_event_sense(
  316. *,
  317. channel_content_id: str,
  318. candidates: list[dict[str, str]],
  319. model: str,
  320. max_attempts: int,
  321. retry_sleep_seconds: float,
  322. max_tokens: int,
  323. ) -> dict[str, Any]:
  324. system_prompt = """
  325. 你是一个事件表达精确度评估专家。
  326. # 任务
  327. 我会提供若干短语或词组组合(可以是特征词的拼接)。
  328. 请逐项判断:该短语/词组能否准确表达出一个具体的事件。
  329. 表达越确切、事件越具体,得分越高。
  330. # 评分标准(0-10)
  331. 9-10:
  332. 精准指向某一具体事件,无歧义,可直接还原事件内容
  333. 7-8:
  334. 大体可判断是某类事件,但存在少量歧义或信息不完整
  335. 4-6:
  336. 有一定事件指向,但过于泛化,无法锁定具体事件
  337. 1-3:
  338. 偏属性/概念描述,几乎无法对应具体事件
  339. 0:
  340. 完全无法表达任何具体事件
  341. # 评估维度(综合考量)
  342. - 主体明确性:是否点出了事件涉及的人/物/组织
  343. - 动作/结果明确性:是否体现了发生了什么
  344. - 时空限定性:是否暗示了特定时间或地点
  345. - 可还原性:仅凭该短语,能否在脑中重建出事件场景
  346. # 输出格式
  347. 严格输出 JSON,禁止输出任何其他内容。
  348. """
  349. return _llm_score_demands(
  350. channel_content_id=channel_content_id,
  351. candidates=candidates,
  352. system_prompt=system_prompt,
  353. model=model,
  354. max_attempts=max_attempts,
  355. retry_sleep_seconds=retry_sleep_seconds,
  356. max_tokens=max_tokens,
  357. score_field="event_sense",
  358. )
  359. def llm_score_senior_fit(
  360. *,
  361. channel_content_id: str,
  362. candidates: list[dict[str, str]],
  363. model: str,
  364. max_attempts: int,
  365. retry_sleep_seconds: float,
  366. max_tokens: int,
  367. ) -> dict[str, Any]:
  368. system_prompt = """
  369. # 角色
  370. 你是一名严格的中老年内容适老性评分专家,专门评估词组/短语对中国50岁以上中老年用户的吸引力与相关性。你的判断基于严格的用户画像,而非主观感受。你会识别并拒绝一切看似"老年"实则属于年轻群体、中产焦虑、高认知门槛或语义模糊的伪适老词组。
  371. # 核心任务
  372. 对输入的每个词组/短语,输出一个0-10的适老性评分,并给出简短判断依据。
  373. ---
  374. # 一、基础定义(严格遵守,不可修改)
  375. ## 用户画像:中国50岁以上中老年人
  376. ### 认知特点
  377. - 追求"确定性"和"安全感",偏好简单直白,拒绝烧脑与推理
  378. - 不关注新事物、抽象宏观经济、复杂金融博弈、枯燥行政程序
  379. - 对网络梗、亚文化、职场黑话不敏感甚至反感
  380. ### 文化背景
  381. - 成长于上世纪50-70年代,传统观念根深蒂固
  382. - 深受儒家文化影响,强烈的孝道观念与集体主义倾向
  383. - 处于"安享期"而非"奋斗期"
  384. - 关注"保命"(三高/心脏/防骗)而非"塑形"(减肥/发际线)
  385. - 关注"存量财产安全"而非"增量资产博弈"
  386. ### 情感需求
  387. - 核心情感:安逸、从容、被尊重
  388. - 偏好:正能量、民族自豪感、家庭温情、传统文化、同龄人故事
  389. - 反感:贩卖焦虑、激烈矛盾冲突、血腥暴力、悲惨负面内容
  390. ### 场景偏好
  391. - 接受:菜市场、公园、家庭、医院、老友聚会、怀旧场景
  392. - 排斥:写字楼、夜店、高端消费场所、极限运动
  393. ---
  394. # 二、适老品类白名单(命中即有基础分)
  395. 以下品类的词组,具备"适老性基础系数",可在此基础上评估具体程度:
  396. - 国家力量/民族自豪:阅兵、基建、外交胜利、撤侨、领土主权、中国强大
  397. - 健康养生:三高管理、心脑血管、养生食疗、长寿、防病(严格排除减肥/塑形/医美/脱发)
  398. - 防骗安全:电信诈骗、保健品骗局、新型骗局案例(极高优先级)
  399. - 惠民政策:养老金、医保报销、现金支付保障、物价、天气预警
  400. - 怀旧时光:70年代及以前老照片、经典老歌、老电影、童年记忆
  401. - 家庭亲情:隔辈亲、孝道、家庭互助(排除婆媳恶斗/剧烈伦理冲突)
  402. - 传统文化:节气、民俗、戏曲、国学、非遗
  403. - 正能量:见义勇为、拾金不昧、平凡善举、反腐倡廉
  404. - 人文科普:文化/历史/人文/健康等社科知识(非科技/自然猎奇)
  405. - 自然惊奇:自然奇观、动物趣闻(排除血腥/恐怖/猎奇阴暗)
  406. ---
  407. # 三、低分黑名单信号
  408. ## 强制低分信号(命中任一,评分不超过3)
  409. - 职场类:升职、副业、内卷、打工人、绩效、裁员
  410. - 年轻文化:网络梗、二次元、潮流、追星、发际线、颜值
  411. - 金融投资:炒股、基金、加密货币、理财产品、资产配置
  412. - 房产相关:买房、贷款、学区房、房价涨跌
  413. - 健身塑形:减肥、健身、马甲线、体脂率、增肌
  414. - 科技数码:手机评测、AI工具、电脑配置、游戏硬件
  415. - 高消费场景:奢侈品、出境游、米其林、高端健身房
  416. - 情绪贩卖:焦虑、内耗、emo、迷茫、躺平
  417. - 模糊悬念:无具体信息的"千万别做这件事"类表达
  418. ## 中度扣分信号(命中使评分下浮1-2分)
  419. - 内容偏泛人群,缺乏老年专属场景
  420. - 认知门槛较高,需要背景知识才能理解
  421. - 表达方式年轻化,但内容本身不排斥老年人
  422. ---
  423. # 四、评分标准(0-10)
  424. - 9-10:高度契合中老年用户核心关注点、典型生活场景或强情感诉求
  425. - 7-8:对中老年用户有较强吸引力或实用价值,场景清晰
  426. - 5-6:有一定相关性,但中老年专属属性一般,泛人群居多
  427. - 3-4:偏年轻群体或泛人群,老年性弱,中老年用户兴趣低
  428. - 1-2:明显面向年轻群体,中老年用户几乎不感兴趣
  429. - 0:与中老年用户完全无关,或存在强烈排斥信号
  430. ---
  431. # 五、输出规则
  432. 严格输出 JSON 对象(含 items 数组),禁止输出 JSON 之外的任何内容(无前缀、无解释、无markdown格式)。
  433. """
  434. return _llm_score_demands(
  435. channel_content_id=channel_content_id,
  436. candidates=candidates,
  437. system_prompt=system_prompt,
  438. model=model,
  439. max_attempts=max_attempts,
  440. retry_sleep_seconds=retry_sleep_seconds,
  441. max_tokens=max_tokens,
  442. score_field="senior_fit",
  443. )
  444. def filter_candidates_by_event_sense(
  445. candidates: list[dict[str, str]],
  446. event_sense_json: dict[str, Any],
  447. *,
  448. event_threshold: float,
  449. ) -> list[dict[str, str]]:
  450. lookup = _build_score_lookup(event_sense_json)
  451. passed: list[dict[str, str]] = []
  452. for candidate in candidates:
  453. key = _candidate_key(candidate["demand_type"], candidate["demand_text"])
  454. item = lookup.get(key)
  455. score = _normalize_score(item.get("score")) if item else None
  456. if score is not None and score >= event_threshold:
  457. passed.append(candidate)
  458. return passed
  459. def run_demand_quality_pipeline(
  460. *,
  461. channel_content_id: str,
  462. export_rows: list[dict[str, Any]],
  463. wxindex_threshold: float,
  464. event_threshold: float,
  465. senior_threshold: float,
  466. model: str,
  467. max_attempts: int,
  468. retry_sleep_seconds: float,
  469. max_tokens: int,
  470. ) -> tuple[dict[str, Any], dict[str, Any]]:
  471. """微信指数达标的需求:串行执行事件性、老年性 LLM,均对全量候选评分。"""
  472. candidates = build_quality_candidates(
  473. export_rows,
  474. wxindex_threshold=wxindex_threshold,
  475. )
  476. if not candidates:
  477. return {"source": channel_content_id, "items": []}, {"source": channel_content_id, "items": []}
  478. llm_kwargs = {
  479. "channel_content_id": channel_content_id,
  480. "candidates": candidates,
  481. "model": model,
  482. "max_attempts": max_attempts,
  483. "retry_sleep_seconds": retry_sleep_seconds,
  484. "max_tokens": max_tokens,
  485. }
  486. event_sense_json = llm_score_event_sense(**llm_kwargs)
  487. event_sense_json["threshold"] = event_threshold
  488. senior_fit_json = llm_score_senior_fit(**llm_kwargs)
  489. senior_fit_json["threshold"] = senior_threshold
  490. return event_sense_json, senior_fit_json