ソースを参照

增加缺少的包

xueyiming 3 週間 前
コミット
6fe2bfea6c
1 ファイル変更574 行追加0 行削除
  1. 574 0
      app/hot_content/demand_quality.py

+ 574 - 0
app/hot_content/demand_quality.py

@@ -0,0 +1,574 @@
+"""需求质量判断:事件性、老年性 LLM 评分。
+
+流程(串行两次 LLM,评分阶段互不截断):
+1. 微信指数达标后,构建待评需求(特征点组合 + 有匹配的短语)
+2. 对全部待评需求执行事件性 LLM 评分
+3. 对同一批全部待评需求执行老年性 LLM 评分
+4. 导出表 / ODPS 写入时再过滤:标题保留(微信指数 + 灵感/目的点匹配)+ 事件性、老年性双达标
+"""
+
+from __future__ import annotations
+
+import json
+import re
+import time
+from typing import Any
+
+from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
+from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
+from app.hot_content.exceptions import HotContentFlowError
+
+TYPE_FEATURE_POINT = "特征点"
+TYPE_PHRASE = "短语"
+
+
+def _normalize_demand_key(value: str) -> str:
+    return "".join(str(value or "").split())
+
+
+def _dedupe_texts(texts: list[str]) -> list[str]:
+    deduped: list[str] = []
+    seen: set[str] = set()
+    for raw in texts:
+        text = str(raw).strip()
+        if not text:
+            continue
+        keys = {text, _normalize_demand_key(text)}
+        if keys & seen:
+            continue
+        seen.update(keys)
+        deduped.append(text)
+    return deduped
+
+
+def _has_matched_demand(row: dict[str, Any]) -> bool:
+    return bool(str(row.get("matched_demand") or "").strip())
+
+
+def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
+    scores: list[float] = []
+    for row in export_rows:
+        try:
+            scores.append(float(row.get("wxindex_latest_score") or 0))
+        except (TypeError, ValueError):
+            continue
+    return max(scores) if scores else 0.0
+
+
+def passes_wxindex_gate(
+    export_rows: list[dict[str, Any]],
+    *,
+    wxindex_threshold: float,
+) -> bool:
+    """记录级微信指数是否达标,用于决定是否进入质量判断。"""
+    return _record_wxindex_score(export_rows) >= wxindex_threshold
+
+
+def _extract_json_object(text: str) -> dict[str, Any]:
+    raw = text.strip()
+    if raw.startswith("```"):
+        raw = re.sub(r"^```(?:json)?\s*", "", raw)
+        raw = re.sub(r"\s*```$", "", raw)
+    try:
+        parsed = json.loads(raw)
+        if isinstance(parsed, dict):
+            return parsed
+    except json.JSONDecodeError:
+        pass
+    match = re.search(r"\{[\s\S]*\}", raw)
+    if not match:
+        raise HotContentFlowError("llm output is not json object")
+    try:
+        parsed = json.loads(match.group(0))
+    except json.JSONDecodeError as exc:
+        raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
+    if not isinstance(parsed, dict):
+        raise HotContentFlowError("llm output is not json object")
+    return parsed
+
+
+def _candidate_key(demand_type: str, demand_text: str) -> tuple[str, str]:
+    return demand_type.strip(), _normalize_demand_key(demand_text)
+
+
+def build_feature_combo_text(export_rows: list[dict[str, Any]]) -> str:
+    element_texts = _dedupe_texts(
+        [
+            str(row.get("item_text") or "").strip()
+            for row in export_rows
+            if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT
+            and _has_matched_demand(row)
+        ]
+    )
+    return " ".join(element_texts)
+
+
+def build_quality_candidates(
+    export_rows: list[dict[str, Any]],
+    *,
+    wxindex_threshold: float,
+) -> list[dict[str, str]]:
+    """微信指数达标时,构建特征点组合与短语两类待评需求。"""
+    if not passes_wxindex_gate(export_rows, wxindex_threshold=wxindex_threshold):
+        return []
+
+    candidates: list[dict[str, str]] = []
+    seen: set[tuple[str, str]] = set()
+
+    feature_combo = build_feature_combo_text(export_rows)
+    if feature_combo:
+        key = _candidate_key(TYPE_FEATURE_POINT, feature_combo)
+        if key not in seen:
+            seen.add(key)
+            candidates.append(
+                {
+                    "demand_type": TYPE_FEATURE_POINT,
+                    "demand_text": feature_combo,
+                }
+            )
+
+    for row in export_rows:
+        if str(row.get("item_type") or "") != ITEM_TYPE_PHRASE:
+            continue
+        if not _has_matched_demand(row):
+            continue
+        phrase_text = str(row.get("item_text") or "").strip()
+        if not phrase_text:
+            continue
+        key = _candidate_key(TYPE_PHRASE, phrase_text)
+        if key in seen:
+            continue
+        seen.add(key)
+        candidates.append(
+            {
+                "demand_type": TYPE_PHRASE,
+                "demand_text": phrase_text,
+            }
+        )
+    return candidates
+
+
+def _normalize_score(value: Any) -> float | None:
+    try:
+        score = float(value)
+    except (TypeError, ValueError):
+        return None
+    if score < 0:
+        return 0.0
+    if score > 10:
+        return 10.0
+    return score
+
+
+def _build_score_lookup(result_json: dict[str, Any] | None) -> dict[tuple[str, str], dict[str, Any]]:
+    lookup: dict[tuple[str, str], dict[str, Any]] = {}
+    if not isinstance(result_json, dict):
+        return lookup
+    items = result_json.get("items") or []
+    if not isinstance(items, list):
+        return lookup
+    for item in items:
+        if not isinstance(item, dict):
+            continue
+        demand_type = str(item.get("demand_type") or "").strip()
+        demand_text = str(item.get("demand_text") or "").strip()
+        if not demand_type or not demand_text:
+            continue
+        lookup[_candidate_key(demand_type, demand_text)] = item
+    return lookup
+
+
+def lookup_quality_scores(
+    *,
+    demand_type: str,
+    demand_text: str,
+    event_sense_json: dict[str, Any] | None,
+    senior_fit_json: dict[str, Any] | None,
+) -> tuple[float | None, float | None]:
+    key = _candidate_key(demand_type, demand_text)
+    event_item = _build_score_lookup(event_sense_json).get(key)
+    senior_item = _build_score_lookup(senior_fit_json).get(key)
+    event_score = _normalize_score(event_item.get("score")) if event_item else None
+    senior_score = _normalize_score(senior_item.get("score")) if senior_item else None
+    return event_score, senior_score
+
+
+def quality_passed(
+    *,
+    demand_type: str,
+    demand_text: str,
+    event_sense_json: dict[str, Any] | None,
+    senior_fit_json: dict[str, Any] | None,
+    event_threshold: float,
+    senior_threshold: float,
+) -> bool:
+    event_score, senior_score = lookup_quality_scores(
+        demand_type=demand_type,
+        demand_text=demand_text,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+    )
+    if event_score is None or senior_score is None:
+        return False
+    return event_score >= event_threshold and senior_score >= senior_threshold
+
+
+def attach_quality_scores_to_export_rows(
+    export_rows: list[dict[str, Any]],
+    *,
+    event_sense_json: dict[str, Any] | None,
+    senior_fit_json: dict[str, Any] | None,
+) -> list[dict[str, Any]]:
+    feature_combo = build_feature_combo_text(export_rows)
+    rows: list[dict[str, Any]] = []
+    for row in export_rows:
+        item_type = str(row.get("item_type") or "")
+        item_text = str(row.get("item_text") or "").strip()
+        if item_type == ITEM_TYPE_ELEMENT and feature_combo:
+            event_score, senior_score = lookup_quality_scores(
+                demand_type=TYPE_FEATURE_POINT,
+                demand_text=feature_combo,
+                event_sense_json=event_sense_json,
+                senior_fit_json=senior_fit_json,
+            )
+        elif item_type == ITEM_TYPE_PHRASE and item_text:
+            event_score, senior_score = lookup_quality_scores(
+                demand_type=TYPE_PHRASE,
+                demand_text=item_text,
+                event_sense_json=event_sense_json,
+                senior_fit_json=senior_fit_json,
+            )
+        else:
+            event_score, senior_score = None, None
+        rows.append(
+            {
+                **row,
+                "event_sense_score": event_score,
+                "senior_fit_score": senior_score,
+            }
+        )
+    return rows
+
+
+def _normalize_llm_items(
+    parsed: dict[str, Any],
+    candidates: list[dict[str, str]],
+) -> list[dict[str, Any]]:
+    candidate_lookup = {
+        _candidate_key(item["demand_type"], item["demand_text"]): item
+        for item in candidates
+    }
+    raw_items = parsed.get("items") or []
+    if not isinstance(raw_items, list):
+        raw_items = []
+
+    items: list[dict[str, Any]] = []
+    seen: set[tuple[str, str]] = set()
+    for raw in raw_items:
+        if not isinstance(raw, dict):
+            continue
+        demand_type = str(raw.get("demand_type") or "").strip()
+        demand_text = str(raw.get("demand_text") or "").strip()
+        if not demand_type or not demand_text:
+            continue
+        key = _candidate_key(demand_type, demand_text)
+        if key not in candidate_lookup or key in seen:
+            continue
+        seen.add(key)
+        score = _normalize_score(raw.get("score"))
+        if score is None:
+            continue
+        items.append(
+            {
+                "demand_type": demand_type,
+                "demand_text": demand_text,
+                "score": score,
+                "reason": str(raw.get("reason") or "").strip(),
+            }
+        )
+    return items
+
+
+def _llm_score_demands(
+    *,
+    channel_content_id: str,
+    candidates: list[dict[str, str]],
+    system_prompt: str,
+    model: str,
+    max_attempts: int,
+    retry_sleep_seconds: float,
+    max_tokens: int,
+    score_field: str,
+) -> dict[str, Any]:
+    if not candidates:
+        return {"source": channel_content_id, "items": []}
+
+    user_payload = {
+        "source": channel_content_id,
+        "demands": candidates,
+        "output_schema": {
+            "source": "string",
+            "items": [
+                {
+                    "demand_type": "string, 特征点 or 短语",
+                    "demand_text": "string, must match one demand in demands",
+                    "score": "number, 0-10",
+                    "reason": "string",
+                }
+            ],
+        },
+        "constraints": [
+            "仅对给定 demands 逐项评分,不得新增或遗漏",
+            "score 为 0-10 的数字,越大表示越符合判断标准",
+            "demand_type 与 demand_text 必须与输入完全一致",
+            "仅输出 JSON 对象,不要 markdown 代码块",
+        ],
+    }
+
+    last_error: Exception | None = None
+    for attempt in range(1, max(max_attempts, 1) + 1):
+        try:
+            resp = create_chat_completion(
+                [
+                    {"role": "system", "content": system_prompt},
+                    {
+                        "role": "user",
+                        "content": json.dumps(user_payload, ensure_ascii=False),
+                    },
+                ],
+                model=model or None,
+                temperature=0,
+                max_tokens=max(max_tokens, 1),
+            )
+            parsed = _extract_json_object(str(resp.get("content") or ""))
+            parsed.setdefault("source", channel_content_id)
+            items = _normalize_llm_items(parsed, candidates)
+            return {
+                "source": channel_content_id,
+                "score_field": score_field,
+                "items": items,
+            }
+        except (OpenRouterCallError, HotContentFlowError) as exc:
+            last_error = exc
+            if attempt < max(max_attempts, 1):
+                time.sleep(max(retry_sleep_seconds, 0))
+    raise HotContentFlowError(
+        f"llm {score_field} scoring failed for {channel_content_id}: {last_error}"
+    ) from last_error
+
+
+def llm_score_event_sense(
+    *,
+    channel_content_id: str,
+    candidates: list[dict[str, str]],
+    model: str,
+    max_attempts: int,
+    retry_sleep_seconds: float,
+    max_tokens: int,
+) -> dict[str, Any]:
+    system_prompt = """
+你是一个事件表达精确度评估专家。
+# 任务
+我会提供若干短语或词组组合(可以是特征词的拼接)。
+请逐项判断:该短语/词组能否准确表达出一个具体的事件。
+表达越确切、事件越具体,得分越高。
+# 评分标准(0-10)
+9-10:
+精准指向某一具体事件,无歧义,可直接还原事件内容
+7-8:
+大体可判断是某类事件,但存在少量歧义或信息不完整
+4-6:
+有一定事件指向,但过于泛化,无法锁定具体事件
+1-3:
+偏属性/概念描述,几乎无法对应具体事件
+0:
+完全无法表达任何具体事件
+# 评估维度(综合考量)
+- 主体明确性:是否点出了事件涉及的人/物/组织
+- 动作/结果明确性:是否体现了发生了什么
+- 时空限定性:是否暗示了特定时间或地点
+- 可还原性:仅凭该短语,能否在脑中重建出事件场景
+# 输出格式
+严格输出 JSON,禁止输出任何其他内容。
+    """
+    return _llm_score_demands(
+        channel_content_id=channel_content_id,
+        candidates=candidates,
+        system_prompt=system_prompt,
+        model=model,
+        max_attempts=max_attempts,
+        retry_sleep_seconds=retry_sleep_seconds,
+        max_tokens=max_tokens,
+        score_field="event_sense",
+    )
+
+
+def llm_score_senior_fit(
+    *,
+    channel_content_id: str,
+    candidates: list[dict[str, str]],
+    model: str,
+    max_attempts: int,
+    retry_sleep_seconds: float,
+    max_tokens: int,
+) -> dict[str, Any]:
+    system_prompt = """
+# 角色
+
+你是一名严格的中老年内容适老性评分专家,专门评估词组/短语对中国50岁以上中老年用户的吸引力与相关性。你的判断基于严格的用户画像,而非主观感受。你会识别并拒绝一切看似"老年"实则属于年轻群体、中产焦虑、高认知门槛或语义模糊的伪适老词组。
+
+# 核心任务
+
+对输入的每个词组/短语,输出一个0-10的适老性评分,并给出简短判断依据。
+
+---
+
+# 一、基础定义(严格遵守,不可修改)
+
+## 用户画像:中国50岁以上中老年人
+
+### 认知特点
+
+- 追求"确定性"和"安全感",偏好简单直白,拒绝烧脑与推理
+- 不关注新事物、抽象宏观经济、复杂金融博弈、枯燥行政程序
+- 对网络梗、亚文化、职场黑话不敏感甚至反感
+
+### 文化背景
+
+- 成长于上世纪50-70年代,传统观念根深蒂固
+- 深受儒家文化影响,强烈的孝道观念与集体主义倾向
+- 处于"安享期"而非"奋斗期"
+- 关注"保命"(三高/心脏/防骗)而非"塑形"(减肥/发际线)
+- 关注"存量财产安全"而非"增量资产博弈"
+
+### 情感需求
+
+- 核心情感:安逸、从容、被尊重
+- 偏好:正能量、民族自豪感、家庭温情、传统文化、同龄人故事
+- 反感:贩卖焦虑、激烈矛盾冲突、血腥暴力、悲惨负面内容
+
+### 场景偏好
+
+- 接受:菜市场、公园、家庭、医院、老友聚会、怀旧场景
+- 排斥:写字楼、夜店、高端消费场所、极限运动
+
+---
+
+# 二、适老品类白名单(命中即有基础分)
+
+以下品类的词组,具备"适老性基础系数",可在此基础上评估具体程度:
+
+- 国家力量/民族自豪:阅兵、基建、外交胜利、撤侨、领土主权、中国强大
+- 健康养生:三高管理、心脑血管、养生食疗、长寿、防病(严格排除减肥/塑形/医美/脱发)
+- 防骗安全:电信诈骗、保健品骗局、新型骗局案例(极高优先级)
+- 惠民政策:养老金、医保报销、现金支付保障、物价、天气预警
+- 怀旧时光:70年代及以前老照片、经典老歌、老电影、童年记忆
+- 家庭亲情:隔辈亲、孝道、家庭互助(排除婆媳恶斗/剧烈伦理冲突)
+- 传统文化:节气、民俗、戏曲、国学、非遗
+- 正能量:见义勇为、拾金不昧、平凡善举、反腐倡廉
+- 人文科普:文化/历史/人文/健康等社科知识(非科技/自然猎奇)
+- 自然惊奇:自然奇观、动物趣闻(排除血腥/恐怖/猎奇阴暗)
+
+---
+
+# 三、低分黑名单信号
+
+## 强制低分信号(命中任一,评分不超过3)
+
+- 职场类:升职、副业、内卷、打工人、绩效、裁员
+- 年轻文化:网络梗、二次元、潮流、追星、发际线、颜值
+- 金融投资:炒股、基金、加密货币、理财产品、资产配置
+- 房产相关:买房、贷款、学区房、房价涨跌
+- 健身塑形:减肥、健身、马甲线、体脂率、增肌
+- 科技数码:手机评测、AI工具、电脑配置、游戏硬件
+- 高消费场景:奢侈品、出境游、米其林、高端健身房
+- 情绪贩卖:焦虑、内耗、emo、迷茫、躺平
+- 模糊悬念:无具体信息的"千万别做这件事"类表达
+
+## 中度扣分信号(命中使评分下浮1-2分)
+
+- 内容偏泛人群,缺乏老年专属场景
+- 认知门槛较高,需要背景知识才能理解
+- 表达方式年轻化,但内容本身不排斥老年人
+
+---
+
+# 四、评分标准(0-10)
+
+- 9-10:高度契合中老年用户核心关注点、典型生活场景或强情感诉求
+- 7-8:对中老年用户有较强吸引力或实用价值,场景清晰
+- 5-6:有一定相关性,但中老年专属属性一般,泛人群居多
+- 3-4:偏年轻群体或泛人群,老年性弱,中老年用户兴趣低
+- 1-2:明显面向年轻群体,中老年用户几乎不感兴趣
+- 0:与中老年用户完全无关,或存在强烈排斥信号
+
+---
+
+# 五、输出规则
+
+严格输出 JSON 对象(含 items 数组),禁止输出 JSON 之外的任何内容(无前缀、无解释、无markdown格式)。
+    """
+    return _llm_score_demands(
+        channel_content_id=channel_content_id,
+        candidates=candidates,
+        system_prompt=system_prompt,
+        model=model,
+        max_attempts=max_attempts,
+        retry_sleep_seconds=retry_sleep_seconds,
+        max_tokens=max_tokens,
+        score_field="senior_fit",
+    )
+
+
+def filter_candidates_by_event_sense(
+    candidates: list[dict[str, str]],
+    event_sense_json: dict[str, Any],
+    *,
+    event_threshold: float,
+) -> list[dict[str, str]]:
+    lookup = _build_score_lookup(event_sense_json)
+    passed: list[dict[str, str]] = []
+    for candidate in candidates:
+        key = _candidate_key(candidate["demand_type"], candidate["demand_text"])
+        item = lookup.get(key)
+        score = _normalize_score(item.get("score")) if item else None
+        if score is not None and score >= event_threshold:
+            passed.append(candidate)
+    return passed
+
+
+def run_demand_quality_pipeline(
+    *,
+    channel_content_id: str,
+    export_rows: list[dict[str, Any]],
+    wxindex_threshold: float,
+    event_threshold: float,
+    senior_threshold: float,
+    model: str,
+    max_attempts: int,
+    retry_sleep_seconds: float,
+    max_tokens: int,
+) -> tuple[dict[str, Any], dict[str, Any]]:
+    """微信指数达标的需求:串行执行事件性、老年性 LLM,均对全量候选评分。"""
+    candidates = build_quality_candidates(
+        export_rows,
+        wxindex_threshold=wxindex_threshold,
+    )
+    if not candidates:
+        return {"source": channel_content_id, "items": []}, {"source": channel_content_id, "items": []}
+
+    llm_kwargs = {
+        "channel_content_id": channel_content_id,
+        "candidates": candidates,
+        "model": model,
+        "max_attempts": max_attempts,
+        "retry_sleep_seconds": retry_sleep_seconds,
+        "max_tokens": max_tokens,
+    }
+
+    event_sense_json = llm_score_event_sense(**llm_kwargs)
+    event_sense_json["threshold"] = event_threshold
+
+    senior_fit_json = llm_score_senior_fit(**llm_kwargs)
+    senior_fit_json["threshold"] = senior_threshold
+    return event_sense_json, senior_fit_json