"""需求质量判断:事件性、老年性 LLM 评分。 流程(串行两次 LLM,评分阶段互不截断): 1. 微信指数达标后,构建待评需求(特征点组合 + 单个有匹配元素 + 有匹配的短语) 2. 对全部待评需求执行事件性 LLM 评分(当前临时下线) 3. 对同一批全部待评需求执行老年性 LLM 评分 4. 导出表 / ODPS 写入时再过滤:标题保留(微信指数 + 灵感/目的点匹配)+ 老年性达标(事件性当前临时下线) """ from __future__ import annotations import json import re import time from typing import Any from app.core.open_router_llm import OpenRouterCallError, create_chat_completion from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE from app.hot_content.exceptions import HotContentFlowError TYPE_FEATURE_POINT = "特征点" TYPE_PHRASE = "短语" def _normalize_demand_key(value: str) -> str: return "".join(str(value or "").split()) def _dedupe_texts(texts: list[str]) -> list[str]: deduped: list[str] = [] seen: set[str] = set() for raw in texts: text = str(raw).strip() if not text: continue keys = {text, _normalize_demand_key(text)} if keys & seen: continue seen.update(keys) deduped.append(text) return deduped def _has_matched_demand(row: dict[str, Any]) -> bool: return bool(str(row.get("matched_demand") or "").strip()) def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float: scores: list[float] = [] for row in export_rows: try: scores.append(float(row.get("wxindex_latest_score") or 0)) except (TypeError, ValueError): continue return max(scores) if scores else 0.0 def passes_wxindex_gate( export_rows: list[dict[str, Any]], *, wxindex_threshold: float, ) -> bool: """记录级微信指数是否达标,用于决定是否进入质量判断。""" return _record_wxindex_score(export_rows) >= wxindex_threshold def _repair_json_text(text: str) -> str: repaired = text.strip() repaired = re.sub(r",\s*([}\]])", r"\1", repaired) repaired = repaired.replace(""", '"').replace(""", '"') repaired = repaired.replace("'", "'").replace("'", "'") return repaired def _extract_score_demands_fallback( raw: str, candidates: list[dict[str, str]], ) -> dict[str, Any] | None: """标准 json.loads 失败时,按候选词宽松提取 score/items。""" items: list[dict[str, Any]] = [] for candidate in candidates: demand_type = str(candidate.get("demand_type") or "").strip() demand_text = str(candidate.get("demand_text") or "").strip() if not demand_type or not demand_text: continue escaped_text = re.escape(demand_text) escaped_type = re.escape(demand_type) score_patterns = [ ( rf'"demand_type"\s*:\s*"{escaped_type}"\s*,\s*' rf'"demand_text"\s*:\s*"{escaped_text}"\s*,\s*' rf'"score"\s*:\s*([0-9]+(?:\.[0-9]+)?)' ), ( rf'"demand_text"\s*:\s*"{escaped_text}"\s*,\s*' rf'"demand_type"\s*:\s*"{escaped_type}"\s*,\s*' rf'"score"\s*:\s*([0-9]+(?:\.[0-9]+)?)' ), ( rf'"demand_text"\s*:\s*"{escaped_text}"' rf'[\s\S]{{0,400}}?"score"\s*:\s*([0-9]+(?:\.[0-9]+)?)' ), ] score_value: float | None = None for pattern in score_patterns: match = re.search(pattern, raw) if match: score_value = _normalize_score(match.group(1)) break if score_value is None: continue reason = "" reason_match = re.search( rf'"demand_text"\s*:\s*"{escaped_text}"' rf'[\s\S]{{0,600}}?"reason"\s*:\s*"((?:[^"\\]|\\.)*)"', raw, ) if reason_match: reason = ( reason_match.group(1) .replace('\\"', '"') .replace("\\n", "\n") .replace("\\t", "\t") ) items.append( { "demand_type": demand_type, "demand_text": demand_text, "score": score_value, "reason": reason, } ) if not items: return None source_match = re.search(r'"source"\s*:\s*"((?:[^"\\]|\\.)*)"', raw) return { "source": source_match.group(1) if source_match else "", "items": items, } def _extract_json_object( text: str, *, candidates: list[dict[str, str]] | None = None, ) -> dict[str, Any]: raw = text.strip() if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\s*", "", raw) raw = re.sub(r"\s*```$", "", raw) blocks = [raw] match = re.search(r"\{[\s\S]*\}", raw) if match: blocks.append(match.group(0)) for block in blocks: for candidate_text in (block, _repair_json_text(block)): try: parsed = json.loads(candidate_text) if isinstance(parsed, dict): return parsed except json.JSONDecodeError: continue if candidates: for block in blocks: fallback = _extract_score_demands_fallback(block, candidates) if fallback: return fallback raise HotContentFlowError("llm output is not json object") def _candidate_key(demand_type: str, demand_text: str) -> tuple[str, str]: return demand_type.strip(), _normalize_demand_key(demand_text) def build_matched_element_texts(export_rows: list[dict[str, Any]]) -> list[str]: return _dedupe_texts( [ str(row.get("item_text") or "").strip() for row in export_rows if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row) ] ) def build_feature_combo_text(export_rows: list[dict[str, Any]]) -> str: return " ".join(build_matched_element_texts(export_rows)) def _append_feature_point_candidate( candidates: list[dict[str, str]], seen: set[tuple[str, str]], demand_text: str, ) -> None: text = str(demand_text or "").strip() if not text: return key = _candidate_key(TYPE_FEATURE_POINT, text) if key in seen: return seen.add(key) candidates.append( { "demand_type": TYPE_FEATURE_POINT, "demand_text": text, } ) def build_quality_candidates( export_rows: list[dict[str, Any]], *, wxindex_threshold: float, ) -> list[dict[str, str]]: """微信指数达标时,构建特征点组合、单个元素与短语三类待评需求。""" if not passes_wxindex_gate(export_rows, wxindex_threshold=wxindex_threshold): return [] candidates: list[dict[str, str]] = [] seen: set[tuple[str, str]] = set() feature_combo = build_feature_combo_text(export_rows) _append_feature_point_candidate(candidates, seen, feature_combo) for element_text in build_matched_element_texts(export_rows): _append_feature_point_candidate(candidates, seen, element_text) for row in export_rows: if str(row.get("item_type") or "") != ITEM_TYPE_PHRASE: continue if not _has_matched_demand(row): continue phrase_text = str(row.get("item_text") or "").strip() if not phrase_text: continue key = _candidate_key(TYPE_PHRASE, phrase_text) if key in seen: continue seen.add(key) candidates.append( { "demand_type": TYPE_PHRASE, "demand_text": phrase_text, } ) return candidates def _normalize_score(value: Any) -> float | None: try: score = float(value) except (TypeError, ValueError): return None if score < 0: return 0.0 if score > 10: return 10.0 return score def _build_score_lookup(result_json: dict[str, Any] | None) -> dict[tuple[str, str], dict[str, Any]]: lookup: dict[tuple[str, str], dict[str, Any]] = {} if not isinstance(result_json, dict): return lookup items = result_json.get("items") or [] if not isinstance(items, list): return lookup for item in items: if not isinstance(item, dict): continue demand_type = str(item.get("demand_type") or "").strip() demand_text = str(item.get("demand_text") or "").strip() if not demand_type or not demand_text: continue lookup[_candidate_key(demand_type, demand_text)] = item return lookup def lookup_quality_scores( *, demand_type: str, demand_text: str, event_sense_json: dict[str, Any] | None, senior_fit_json: dict[str, Any] | None, ) -> tuple[float | None, float | None]: key = _candidate_key(demand_type, demand_text) event_item = _build_score_lookup(event_sense_json).get(key) senior_item = _build_score_lookup(senior_fit_json).get(key) event_score = _normalize_score(event_item.get("score")) if event_item else None senior_score = _normalize_score(senior_item.get("score")) if senior_item else None return event_score, senior_score def quality_passed( *, demand_type: str, demand_text: str, event_sense_json: dict[str, Any] | None, senior_fit_json: dict[str, Any] | None, event_threshold: float, senior_threshold: float, ) -> bool: event_score, senior_score = lookup_quality_scores( demand_type=demand_type, demand_text=demand_text, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) # TODO: 事件性判断临时下线,恢复时取消下方注释并删除老年性单判逻辑 # if event_score is None or senior_score is None: # return False # return event_score >= event_threshold and senior_score >= senior_threshold if senior_score is None: return False return senior_score >= senior_threshold def attach_quality_scores_to_export_rows( export_rows: list[dict[str, Any]], *, event_sense_json: dict[str, Any] | None, senior_fit_json: dict[str, Any] | None, ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for row in export_rows: item_type = str(row.get("item_type") or "") item_text = str(row.get("item_text") or "").strip() if item_type == ITEM_TYPE_ELEMENT and item_text and _has_matched_demand(row): event_score, senior_score = lookup_quality_scores( demand_type=TYPE_FEATURE_POINT, demand_text=item_text, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) elif item_type == ITEM_TYPE_PHRASE and item_text: event_score, senior_score = lookup_quality_scores( demand_type=TYPE_PHRASE, demand_text=item_text, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) else: event_score, senior_score = None, None rows.append( { **row, "event_sense_score": event_score, "senior_fit_score": senior_score, } ) return rows def _normalize_llm_items( parsed: dict[str, Any], candidates: list[dict[str, str]], ) -> list[dict[str, Any]]: candidate_lookup = { _candidate_key(item["demand_type"], item["demand_text"]): item for item in candidates } raw_items = parsed.get("items") or [] if not isinstance(raw_items, list): raw_items = [] items: list[dict[str, Any]] = [] seen: set[tuple[str, str]] = set() for raw in raw_items: if not isinstance(raw, dict): continue demand_type = str(raw.get("demand_type") or "").strip() demand_text = str(raw.get("demand_text") or "").strip() if not demand_type or not demand_text: continue key = _candidate_key(demand_type, demand_text) if key not in candidate_lookup or key in seen: continue seen.add(key) score = _normalize_score(raw.get("score")) if score is None: continue items.append( { "demand_type": demand_type, "demand_text": demand_text, "score": score, "reason": str(raw.get("reason") or "").strip(), } ) return items def _llm_score_demands( *, channel_content_id: str, candidates: list[dict[str, str]], system_prompt: str, model: str, max_attempts: int, retry_sleep_seconds: float, max_tokens: int, score_field: str, ) -> dict[str, Any]: if not candidates: return {"source": channel_content_id, "items": []} user_payload = { "source": channel_content_id, "demands": candidates, "output_schema": { "source": "string", "items": [ { "demand_type": "string, 特征点 or 短语", "demand_text": "string, must match one demand in demands", "score": "number, 0-10", "reason": "string", } ], }, "constraints": [ "仅对给定 demands 逐项评分,不得新增或遗漏", "score 为 0-10 的数字,越大表示越符合判断标准", "demand_type 与 demand_text 必须与输入完全一致", "reason 字段请用中文表述,不要使用英文双引号 \"", "仅输出 JSON 对象,不要 markdown 代码块", ], } last_error: Exception | None = None for attempt in range(1, max(max_attempts, 1) + 1): try: resp = create_chat_completion( [ {"role": "system", "content": system_prompt}, { "role": "user", "content": json.dumps(user_payload, ensure_ascii=False), }, ], model=model or None, temperature=0, max_tokens=max(max_tokens, 1), ) parsed = _extract_json_object( str(resp.get("content") or ""), candidates=candidates, ) parsed.setdefault("source", channel_content_id) items = _normalize_llm_items(parsed, candidates) return { "source": channel_content_id, "score_field": score_field, "items": items, } except (OpenRouterCallError, HotContentFlowError) as exc: last_error = exc if attempt < max(max_attempts, 1): time.sleep(max(retry_sleep_seconds, 0)) raise HotContentFlowError( f"llm {score_field} scoring failed for {channel_content_id}: {last_error}" ) from last_error def llm_score_event_sense( *, channel_content_id: str, candidates: list[dict[str, str]], model: str, max_attempts: int, retry_sleep_seconds: float, max_tokens: int, ) -> dict[str, Any]: system_prompt = """ 你是一个事件表达精确度评估专家。 # 任务 我会提供若干短语或词组组合(可以是特征词的拼接)。 请逐项判断:该短语/词组能否准确表达出一个具体的事件。 表达越确切、事件越具体,得分越高。 # 评分标准(0-10) 9-10: 精准指向某一具体事件,无歧义,可直接还原事件内容 7-8: 大体可判断是某类事件,但存在少量歧义或信息不完整 4-6: 有一定事件指向,但过于泛化,无法锁定具体事件 1-3: 偏属性/概念描述,几乎无法对应具体事件 0: 完全无法表达任何具体事件 # 评估维度(综合考量) - 主体明确性:是否点出了事件涉及的人/物/组织 - 动作/结果明确性:是否体现了发生了什么 - 时空限定性:是否暗示了特定时间或地点 - 可还原性:仅凭该短语,能否在脑中重建出事件场景 # 输出格式 严格输出 JSON,禁止输出任何其他内容。 """ return _llm_score_demands( channel_content_id=channel_content_id, candidates=candidates, system_prompt=system_prompt, model=model, max_attempts=max_attempts, retry_sleep_seconds=retry_sleep_seconds, max_tokens=max_tokens, score_field="event_sense", ) def llm_score_senior_fit( *, channel_content_id: str, candidates: list[dict[str, str]], model: str, max_attempts: int, retry_sleep_seconds: float, max_tokens: int, ) -> dict[str, Any]: system_prompt = """ # 角色 你是一名严格的中老年内容适老性评分专家,专门评估词组/短语对中国50岁以上中老年用户的吸引力与相关性。你的判断基于严格的用户画像,而非主观感受。你会识别并拒绝一切看似"老年"实则属于年轻群体、中产焦虑、高认知门槛或语义模糊的伪适老词组。 # 核心任务 对输入的每个词组/短语,输出一个0-10的适老性评分,并给出简短判断依据。 --- # 一、基础定义(严格遵守,不可修改) ## 用户画像:中国50岁以上中老年人 ### 认知特点 - 追求"确定性"和"安全感",偏好简单直白,拒绝烧脑与推理 - 不关注新事物、抽象宏观经济、复杂金融博弈、枯燥行政程序 - 对网络梗、亚文化、职场黑话不敏感甚至反感 ### 文化背景 - 成长于上世纪50-70年代,传统观念根深蒂固 - 深受儒家文化影响,强烈的孝道观念与集体主义倾向 - 处于"安享期"而非"奋斗期" - 关注"保命"(三高/心脏/防骗)而非"塑形"(减肥/发际线) - 关注"存量财产安全"而非"增量资产博弈" ### 情感需求 - 核心情感:安逸、从容、被尊重 - 偏好:正能量、民族自豪感、家庭温情、传统文化、同龄人故事 - 反感:贩卖焦虑、激烈矛盾冲突、血腥暴力、悲惨负面内容 ### 场景偏好 - 接受:菜市场、公园、家庭、医院、老友聚会、怀旧场景 - 排斥:写字楼、夜店、高端消费场所、极限运动 --- # 二、高优品类强制评分白名单(最低分不得低于7分) 以下品类为业务侧已确认的高优内容,评分最低保底7分。 原黑名单中的"博弈策略""认知门槛高""泛人群"等扣分信号对本表品类完全失效。 ## 满分区间(9-10分) - 健康养生·老年健康(含老年专属标注时满分) - 生活技巧·安全防护·反诈防骗 - 公共管理·医疗卫生·医保报销 - 公共管理·补贴福利·老年群体补贴 - 爱国情感·民族情感 - 政治事件·领袖纪念 - 民生政策·惠民政策(明确老年受益时满分,泛人群降为8) ## 高分区间(8-9分) - 公共管理·治理监督·反腐 - 国家实力·国际地位 - 政策制度·国家统一 - 外交事件·外交访问 - 处世智慧·生存策略·经验总结 - 处世智慧·生存策略·生活指导 ## 中高分区间(7-8分) - 处世智慧·价值取向·处世哲学 - 民生政策·免费福利政策 - 公共管理·补贴福利·生活服务补贴 - 文化概念·文化传承 - 时政评议·社会评议·社会公正 - 军事谋略·战略运筹·战略方案 - 社会问题·国家安全事件 - 国际政治·外交立场 - 国际政治·双边关系(中美关系等) - 时政评议·国际关系·中美关系 - 社会问题·国际问题·两岸议题 - 地标景观·交通枢纽 - 花卉风格·高饱和花卉 ## 保底7分(区间锁定,不上浮) - 外交事件·博弈手段 - 政治运作·政治博弈 - 军事安全·能源安全 - 公共管理·政策法规·行业规则 - 社会问题·经济形势·农村农业 - 民生生活·教育议题 --- # 三、高优品类内部评分细则 ## 细则1:叙事方式决定区间上限 同一高优品类,叙事方式决定最终得分位置: - 叙事/成就型(XX圆满完成、我国XX取得胜利)→ 取区间上限 - 评议/立场型(分析XX走向、表达XX立场)→ 取区间中位 - 策略/博弈型(对抗手段、如何反制)→ 取区间下限(最低7分) ## 细则2:老年专属加成 品类内容含"老年""中老年""50岁以上"等明确专属信号 → 区间内+1分(不超过10分) ## 细则3:焦虑化叙事微降(高优品类内仍适用) 使用"危机""崩溃""末日""警惕"等词渲染负面不确定性 → 区间内-1分(不低于7分保底) --- # 四、低分黑名单信号 ## 强制低分信号(命中任一,评分不超过3) - 职场类:升职、副业、内卷、打工人、绩效、裁员 - 年轻文化:网络梗、二次元、潮流、追星、发际线、颜值 - 金融投资:炒股、基金、加密货币、理财产品、资产配置 - 房产相关:买房、贷款、学区房、房价涨跌 - 健身塑形:减肥、健身、马甲线、体脂率、增肌 - 科技数码:手机评测、AI工具、电脑配置、游戏硬件 - 高消费场景:奢侈品、出境游、米其林、高端健身房 - 情绪贩卖:焦虑、内耗、emo、迷茫、躺平 - 模糊悬念:无具体信息的"千万别做这件事"类表达 ## 中度扣分信号(命中使评分下浮1-2分) - 内容偏泛人群,缺乏老年专属场景(如免费福利政策未明确针对老年人) - 认知门槛较高,需要背景知识才能理解(如军事专业术语密集) - 表达方式年轻化,但内容本身不排斥老年人 - **焦虑化叙事**:即使话题本身适老,若使用"危机""崩溃""警惕""崩盘"等词渲染不确定性,触发此扣分信号——老年用户偏好"确定性"叙事,排斥焦虑化包装 - 农村农业/教育议题:属泛人群内容,老年专属性弱,基础降至2-4分 --- # 五、评分标准(0-10) - 9-10:高度契合中老年用户核心关注点、老年专属场景或强情感诉求(防骗、老年健康、养老金、民族自豪) - 7-8:对中老年用户有较强吸引力或实用价值,场景清晰(处世智慧、传统文化、家庭亲情、叙事型时政) - 5-6:有一定相关性,但中老年专属属性一般,泛人群居多(评议型时政、泛人群惠民、文化传承) - 3-4:偏年轻群体或认知门槛偏高,老年性弱 - 1-2:明显面向年轻群体,中老年用户几乎不感兴趣 - 0:与中老年用户完全无关,或存在强烈排斥信号 --- # 五、输出规则 严格输出 JSON 对象(含 items 数组),禁止输出 JSON 之外的任何内容(无前缀、无解释、无markdown格式)。 reason 字段请用中文表述,不要使用英文双引号 "。 """ return _llm_score_demands( channel_content_id=channel_content_id, candidates=candidates, system_prompt=system_prompt, model=model, max_attempts=max_attempts, retry_sleep_seconds=retry_sleep_seconds, max_tokens=max_tokens, score_field="senior_fit", ) def filter_candidates_by_event_sense( candidates: list[dict[str, str]], event_sense_json: dict[str, Any], *, event_threshold: float, ) -> list[dict[str, str]]: lookup = _build_score_lookup(event_sense_json) passed: list[dict[str, str]] = [] for candidate in candidates: key = _candidate_key(candidate["demand_type"], candidate["demand_text"]) item = lookup.get(key) score = _normalize_score(item.get("score")) if item else None if score is not None and score >= event_threshold: passed.append(candidate) return passed def run_demand_quality_pipeline( *, channel_content_id: str, export_rows: list[dict[str, Any]], wxindex_threshold: float, event_threshold: float, senior_threshold: float, model: str, max_attempts: int, retry_sleep_seconds: float, max_tokens: int, ) -> tuple[dict[str, Any], dict[str, Any]]: """微信指数达标的需求:串行执行事件性、老年性 LLM,均对全量候选评分。""" candidates = build_quality_candidates( export_rows, wxindex_threshold=wxindex_threshold, ) if not candidates: return {"source": channel_content_id, "items": []}, {"source": channel_content_id, "items": []} llm_kwargs = { "channel_content_id": channel_content_id, "candidates": candidates, "model": model, "max_attempts": max_attempts, "retry_sleep_seconds": retry_sleep_seconds, "max_tokens": max_tokens, } # TODO: 事件性判断临时下线,恢复时取消下方注释并删除 stub 返回 # event_sense_json = llm_score_event_sense(**llm_kwargs) # event_sense_json["threshold"] = event_threshold event_sense_json = { "source": channel_content_id, "items": [], "threshold": event_threshold, } senior_fit_json = llm_score_senior_fit(**llm_kwargs) senior_fit_json["threshold"] = senior_threshold return event_sense_json, senior_fit_json