| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 |
- """需求质量判断:事件性、老年性 LLM 评分。
- 流程(串行两次 LLM,评分阶段互不截断):
- 1. 微信指数达标后,构建待评需求(特征点组合 + 有匹配的短语)
- 2. 对全部待评需求执行事件性 LLM 评分
- 3. 对同一批全部待评需求执行老年性 LLM 评分
- 4. 导出表 / ODPS 写入时再过滤:标题保留(微信指数 + 灵感/目的点匹配)+ 事件性、老年性双达标
- """
- from __future__ import annotations
- import json
- import re
- import time
- from typing import Any
- from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
- from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
- from app.hot_content.exceptions import HotContentFlowError
- TYPE_FEATURE_POINT = "特征点"
- TYPE_PHRASE = "短语"
- def _normalize_demand_key(value: str) -> str:
- return "".join(str(value or "").split())
- def _dedupe_texts(texts: list[str]) -> list[str]:
- deduped: list[str] = []
- seen: set[str] = set()
- for raw in texts:
- text = str(raw).strip()
- if not text:
- continue
- keys = {text, _normalize_demand_key(text)}
- if keys & seen:
- continue
- seen.update(keys)
- deduped.append(text)
- return deduped
- def _has_matched_demand(row: dict[str, Any]) -> bool:
- return bool(str(row.get("matched_demand") or "").strip())
- def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
- scores: list[float] = []
- for row in export_rows:
- try:
- scores.append(float(row.get("wxindex_latest_score") or 0))
- except (TypeError, ValueError):
- continue
- return max(scores) if scores else 0.0
- def passes_wxindex_gate(
- export_rows: list[dict[str, Any]],
- *,
- wxindex_threshold: float,
- ) -> bool:
- """记录级微信指数是否达标,用于决定是否进入质量判断。"""
- return _record_wxindex_score(export_rows) >= wxindex_threshold
- def _extract_json_object(text: str) -> dict[str, Any]:
- raw = text.strip()
- if raw.startswith("```"):
- raw = re.sub(r"^```(?:json)?\s*", "", raw)
- raw = re.sub(r"\s*```$", "", raw)
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, dict):
- return parsed
- except json.JSONDecodeError:
- pass
- match = re.search(r"\{[\s\S]*\}", raw)
- if not match:
- raise HotContentFlowError("llm output is not json object")
- try:
- parsed = json.loads(match.group(0))
- except json.JSONDecodeError as exc:
- raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
- if not isinstance(parsed, dict):
- raise HotContentFlowError("llm output is not json object")
- return parsed
- def _candidate_key(demand_type: str, demand_text: str) -> tuple[str, str]:
- return demand_type.strip(), _normalize_demand_key(demand_text)
- def build_feature_combo_text(export_rows: list[dict[str, Any]]) -> str:
- element_texts = _dedupe_texts(
- [
- str(row.get("item_text") or "").strip()
- for row in export_rows
- if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT
- and _has_matched_demand(row)
- ]
- )
- return " ".join(element_texts)
- def build_quality_candidates(
- export_rows: list[dict[str, Any]],
- *,
- wxindex_threshold: float,
- ) -> list[dict[str, str]]:
- """微信指数达标时,构建特征点组合与短语两类待评需求。"""
- if not passes_wxindex_gate(export_rows, wxindex_threshold=wxindex_threshold):
- return []
- candidates: list[dict[str, str]] = []
- seen: set[tuple[str, str]] = set()
- feature_combo = build_feature_combo_text(export_rows)
- if feature_combo:
- key = _candidate_key(TYPE_FEATURE_POINT, feature_combo)
- if key not in seen:
- seen.add(key)
- candidates.append(
- {
- "demand_type": TYPE_FEATURE_POINT,
- "demand_text": feature_combo,
- }
- )
- for row in export_rows:
- if str(row.get("item_type") or "") != ITEM_TYPE_PHRASE:
- continue
- if not _has_matched_demand(row):
- continue
- phrase_text = str(row.get("item_text") or "").strip()
- if not phrase_text:
- continue
- key = _candidate_key(TYPE_PHRASE, phrase_text)
- if key in seen:
- continue
- seen.add(key)
- candidates.append(
- {
- "demand_type": TYPE_PHRASE,
- "demand_text": phrase_text,
- }
- )
- return candidates
- def _normalize_score(value: Any) -> float | None:
- try:
- score = float(value)
- except (TypeError, ValueError):
- return None
- if score < 0:
- return 0.0
- if score > 10:
- return 10.0
- return score
- def _build_score_lookup(result_json: dict[str, Any] | None) -> dict[tuple[str, str], dict[str, Any]]:
- lookup: dict[tuple[str, str], dict[str, Any]] = {}
- if not isinstance(result_json, dict):
- return lookup
- items = result_json.get("items") or []
- if not isinstance(items, list):
- return lookup
- for item in items:
- if not isinstance(item, dict):
- continue
- demand_type = str(item.get("demand_type") or "").strip()
- demand_text = str(item.get("demand_text") or "").strip()
- if not demand_type or not demand_text:
- continue
- lookup[_candidate_key(demand_type, demand_text)] = item
- return lookup
- def lookup_quality_scores(
- *,
- demand_type: str,
- demand_text: str,
- event_sense_json: dict[str, Any] | None,
- senior_fit_json: dict[str, Any] | None,
- ) -> tuple[float | None, float | None]:
- key = _candidate_key(demand_type, demand_text)
- event_item = _build_score_lookup(event_sense_json).get(key)
- senior_item = _build_score_lookup(senior_fit_json).get(key)
- event_score = _normalize_score(event_item.get("score")) if event_item else None
- senior_score = _normalize_score(senior_item.get("score")) if senior_item else None
- return event_score, senior_score
- def quality_passed(
- *,
- demand_type: str,
- demand_text: str,
- event_sense_json: dict[str, Any] | None,
- senior_fit_json: dict[str, Any] | None,
- event_threshold: float,
- senior_threshold: float,
- ) -> bool:
- event_score, senior_score = lookup_quality_scores(
- demand_type=demand_type,
- demand_text=demand_text,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- if event_score is None or senior_score is None:
- return False
- return event_score >= event_threshold and senior_score >= senior_threshold
- def attach_quality_scores_to_export_rows(
- export_rows: list[dict[str, Any]],
- *,
- event_sense_json: dict[str, Any] | None,
- senior_fit_json: dict[str, Any] | None,
- ) -> list[dict[str, Any]]:
- feature_combo = build_feature_combo_text(export_rows)
- rows: list[dict[str, Any]] = []
- for row in export_rows:
- item_type = str(row.get("item_type") or "")
- item_text = str(row.get("item_text") or "").strip()
- if item_type == ITEM_TYPE_ELEMENT and feature_combo:
- event_score, senior_score = lookup_quality_scores(
- demand_type=TYPE_FEATURE_POINT,
- demand_text=feature_combo,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- elif item_type == ITEM_TYPE_PHRASE and item_text:
- event_score, senior_score = lookup_quality_scores(
- demand_type=TYPE_PHRASE,
- demand_text=item_text,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- else:
- event_score, senior_score = None, None
- rows.append(
- {
- **row,
- "event_sense_score": event_score,
- "senior_fit_score": senior_score,
- }
- )
- return rows
- def _normalize_llm_items(
- parsed: dict[str, Any],
- candidates: list[dict[str, str]],
- ) -> list[dict[str, Any]]:
- candidate_lookup = {
- _candidate_key(item["demand_type"], item["demand_text"]): item
- for item in candidates
- }
- raw_items = parsed.get("items") or []
- if not isinstance(raw_items, list):
- raw_items = []
- items: list[dict[str, Any]] = []
- seen: set[tuple[str, str]] = set()
- for raw in raw_items:
- if not isinstance(raw, dict):
- continue
- demand_type = str(raw.get("demand_type") or "").strip()
- demand_text = str(raw.get("demand_text") or "").strip()
- if not demand_type or not demand_text:
- continue
- key = _candidate_key(demand_type, demand_text)
- if key not in candidate_lookup or key in seen:
- continue
- seen.add(key)
- score = _normalize_score(raw.get("score"))
- if score is None:
- continue
- items.append(
- {
- "demand_type": demand_type,
- "demand_text": demand_text,
- "score": score,
- "reason": str(raw.get("reason") or "").strip(),
- }
- )
- return items
- def _llm_score_demands(
- *,
- channel_content_id: str,
- candidates: list[dict[str, str]],
- system_prompt: str,
- model: str,
- max_attempts: int,
- retry_sleep_seconds: float,
- max_tokens: int,
- score_field: str,
- ) -> dict[str, Any]:
- if not candidates:
- return {"source": channel_content_id, "items": []}
- user_payload = {
- "source": channel_content_id,
- "demands": candidates,
- "output_schema": {
- "source": "string",
- "items": [
- {
- "demand_type": "string, 特征点 or 短语",
- "demand_text": "string, must match one demand in demands",
- "score": "number, 0-10",
- "reason": "string",
- }
- ],
- },
- "constraints": [
- "仅对给定 demands 逐项评分,不得新增或遗漏",
- "score 为 0-10 的数字,越大表示越符合判断标准",
- "demand_type 与 demand_text 必须与输入完全一致",
- "仅输出 JSON 对象,不要 markdown 代码块",
- ],
- }
- last_error: Exception | None = None
- for attempt in range(1, max(max_attempts, 1) + 1):
- try:
- resp = create_chat_completion(
- [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": json.dumps(user_payload, ensure_ascii=False),
- },
- ],
- model=model or None,
- temperature=0,
- max_tokens=max(max_tokens, 1),
- )
- parsed = _extract_json_object(str(resp.get("content") or ""))
- parsed.setdefault("source", channel_content_id)
- items = _normalize_llm_items(parsed, candidates)
- return {
- "source": channel_content_id,
- "score_field": score_field,
- "items": items,
- }
- except (OpenRouterCallError, HotContentFlowError) as exc:
- last_error = exc
- if attempt < max(max_attempts, 1):
- time.sleep(max(retry_sleep_seconds, 0))
- raise HotContentFlowError(
- f"llm {score_field} scoring failed for {channel_content_id}: {last_error}"
- ) from last_error
- def llm_score_event_sense(
- *,
- channel_content_id: str,
- candidates: list[dict[str, str]],
- model: str,
- max_attempts: int,
- retry_sleep_seconds: float,
- max_tokens: int,
- ) -> dict[str, Any]:
- system_prompt = """
- 你是一个事件表达精确度评估专家。
- # 任务
- 我会提供若干短语或词组组合(可以是特征词的拼接)。
- 请逐项判断:该短语/词组能否准确表达出一个具体的事件。
- 表达越确切、事件越具体,得分越高。
- # 评分标准(0-10)
- 9-10:
- 精准指向某一具体事件,无歧义,可直接还原事件内容
- 7-8:
- 大体可判断是某类事件,但存在少量歧义或信息不完整
- 4-6:
- 有一定事件指向,但过于泛化,无法锁定具体事件
- 1-3:
- 偏属性/概念描述,几乎无法对应具体事件
- 0:
- 完全无法表达任何具体事件
- # 评估维度(综合考量)
- - 主体明确性:是否点出了事件涉及的人/物/组织
- - 动作/结果明确性:是否体现了发生了什么
- - 时空限定性:是否暗示了特定时间或地点
- - 可还原性:仅凭该短语,能否在脑中重建出事件场景
- # 输出格式
- 严格输出 JSON,禁止输出任何其他内容。
- """
- return _llm_score_demands(
- channel_content_id=channel_content_id,
- candidates=candidates,
- system_prompt=system_prompt,
- model=model,
- max_attempts=max_attempts,
- retry_sleep_seconds=retry_sleep_seconds,
- max_tokens=max_tokens,
- score_field="event_sense",
- )
- def llm_score_senior_fit(
- *,
- channel_content_id: str,
- candidates: list[dict[str, str]],
- model: str,
- max_attempts: int,
- retry_sleep_seconds: float,
- max_tokens: int,
- ) -> dict[str, Any]:
- system_prompt = """
- # 角色
- 你是一名严格的中老年内容适老性评分专家,专门评估词组/短语对中国50岁以上中老年用户的吸引力与相关性。你的判断基于严格的用户画像,而非主观感受。你会识别并拒绝一切看似"老年"实则属于年轻群体、中产焦虑、高认知门槛或语义模糊的伪适老词组。
- # 核心任务
- 对输入的每个词组/短语,输出一个0-10的适老性评分,并给出简短判断依据。
- ---
- # 一、基础定义(严格遵守,不可修改)
- ## 用户画像:中国50岁以上中老年人
- ### 认知特点
- - 追求"确定性"和"安全感",偏好简单直白,拒绝烧脑与推理
- - 不关注新事物、抽象宏观经济、复杂金融博弈、枯燥行政程序
- - 对网络梗、亚文化、职场黑话不敏感甚至反感
- ### 文化背景
- - 成长于上世纪50-70年代,传统观念根深蒂固
- - 深受儒家文化影响,强烈的孝道观念与集体主义倾向
- - 处于"安享期"而非"奋斗期"
- - 关注"保命"(三高/心脏/防骗)而非"塑形"(减肥/发际线)
- - 关注"存量财产安全"而非"增量资产博弈"
- ### 情感需求
- - 核心情感:安逸、从容、被尊重
- - 偏好:正能量、民族自豪感、家庭温情、传统文化、同龄人故事
- - 反感:贩卖焦虑、激烈矛盾冲突、血腥暴力、悲惨负面内容
- ### 场景偏好
- - 接受:菜市场、公园、家庭、医院、老友聚会、怀旧场景
- - 排斥:写字楼、夜店、高端消费场所、极限运动
- ---
- # 二、适老品类白名单(命中即有基础分)
- 以下品类的词组,具备"适老性基础系数",可在此基础上评估具体程度:
- - 国家力量/民族自豪:阅兵、基建、外交胜利、撤侨、领土主权、中国强大
- - 健康养生:三高管理、心脑血管、养生食疗、长寿、防病(严格排除减肥/塑形/医美/脱发)
- - 防骗安全:电信诈骗、保健品骗局、新型骗局案例(极高优先级)
- - 惠民政策:养老金、医保报销、现金支付保障、物价、天气预警
- - 怀旧时光:70年代及以前老照片、经典老歌、老电影、童年记忆
- - 家庭亲情:隔辈亲、孝道、家庭互助(排除婆媳恶斗/剧烈伦理冲突)
- - 传统文化:节气、民俗、戏曲、国学、非遗
- - 正能量:见义勇为、拾金不昧、平凡善举、反腐倡廉
- - 人文科普:文化/历史/人文/健康等社科知识(非科技/自然猎奇)
- - 自然惊奇:自然奇观、动物趣闻(排除血腥/恐怖/猎奇阴暗)
- ---
- # 三、低分黑名单信号
- ## 强制低分信号(命中任一,评分不超过3)
- - 职场类:升职、副业、内卷、打工人、绩效、裁员
- - 年轻文化:网络梗、二次元、潮流、追星、发际线、颜值
- - 金融投资:炒股、基金、加密货币、理财产品、资产配置
- - 房产相关:买房、贷款、学区房、房价涨跌
- - 健身塑形:减肥、健身、马甲线、体脂率、增肌
- - 科技数码:手机评测、AI工具、电脑配置、游戏硬件
- - 高消费场景:奢侈品、出境游、米其林、高端健身房
- - 情绪贩卖:焦虑、内耗、emo、迷茫、躺平
- - 模糊悬念:无具体信息的"千万别做这件事"类表达
- ## 中度扣分信号(命中使评分下浮1-2分)
- - 内容偏泛人群,缺乏老年专属场景
- - 认知门槛较高,需要背景知识才能理解
- - 表达方式年轻化,但内容本身不排斥老年人
- ---
- # 四、评分标准(0-10)
- - 9-10:高度契合中老年用户核心关注点、典型生活场景或强情感诉求
- - 7-8:对中老年用户有较强吸引力或实用价值,场景清晰
- - 5-6:有一定相关性,但中老年专属属性一般,泛人群居多
- - 3-4:偏年轻群体或泛人群,老年性弱,中老年用户兴趣低
- - 1-2:明显面向年轻群体,中老年用户几乎不感兴趣
- - 0:与中老年用户完全无关,或存在强烈排斥信号
- ---
- # 五、输出规则
- 严格输出 JSON 对象(含 items 数组),禁止输出 JSON 之外的任何内容(无前缀、无解释、无markdown格式)。
- """
- return _llm_score_demands(
- channel_content_id=channel_content_id,
- candidates=candidates,
- system_prompt=system_prompt,
- model=model,
- max_attempts=max_attempts,
- retry_sleep_seconds=retry_sleep_seconds,
- max_tokens=max_tokens,
- score_field="senior_fit",
- )
- def filter_candidates_by_event_sense(
- candidates: list[dict[str, str]],
- event_sense_json: dict[str, Any],
- *,
- event_threshold: float,
- ) -> list[dict[str, str]]:
- lookup = _build_score_lookup(event_sense_json)
- passed: list[dict[str, str]] = []
- for candidate in candidates:
- key = _candidate_key(candidate["demand_type"], candidate["demand_text"])
- item = lookup.get(key)
- score = _normalize_score(item.get("score")) if item else None
- if score is not None and score >= event_threshold:
- passed.append(candidate)
- return passed
- def run_demand_quality_pipeline(
- *,
- channel_content_id: str,
- export_rows: list[dict[str, Any]],
- wxindex_threshold: float,
- event_threshold: float,
- senior_threshold: float,
- model: str,
- max_attempts: int,
- retry_sleep_seconds: float,
- max_tokens: int,
- ) -> tuple[dict[str, Any], dict[str, Any]]:
- """微信指数达标的需求:串行执行事件性、老年性 LLM,均对全量候选评分。"""
- candidates = build_quality_candidates(
- export_rows,
- wxindex_threshold=wxindex_threshold,
- )
- if not candidates:
- return {"source": channel_content_id, "items": []}, {"source": channel_content_id, "items": []}
- llm_kwargs = {
- "channel_content_id": channel_content_id,
- "candidates": candidates,
- "model": model,
- "max_attempts": max_attempts,
- "retry_sleep_seconds": retry_sleep_seconds,
- "max_tokens": max_tokens,
- }
- event_sense_json = llm_score_event_sense(**llm_kwargs)
- event_sense_json["threshold"] = event_threshold
- senior_fit_json = llm_score_senior_fit(**llm_kwargs)
- senior_fit_json["threshold"] = senior_threshold
- return event_sense_json, senior_fit_json
|