|
|
@@ -0,0 +1,289 @@
|
|
|
+import json
|
|
|
+from collections import defaultdict
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+from examples.demand.data_query_tools import get_changwen_weight
|
|
|
+from examples.demand.db_manager import DatabaseManager
|
|
|
+from examples.demand.models import TopicPatternElement, TopicPatternExecution
|
|
|
+from examples.demand.pattern_builds.pattern_service import run_mining
|
|
|
+
|
|
|
+db = DatabaseManager()
|
|
|
+
|
|
|
+
|
|
|
+def _safe_float(value):
|
|
|
+ if value is None:
|
|
|
+ return 0.0
|
|
|
+ try:
|
|
|
+ return float(value)
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+
|
|
|
+def _build_category_scores(name_scores, name_paths, name_post_ids):
|
|
|
+ node_scores = defaultdict(float)
|
|
|
+ node_post_ids = defaultdict(set)
|
|
|
+ for name, score in name_scores.items():
|
|
|
+ paths = name_paths.get(name, set())
|
|
|
+ post_ids = name_post_ids.get(name, set())
|
|
|
+ for category_path in paths:
|
|
|
+ if not category_path:
|
|
|
+ continue
|
|
|
+ nodes = [segment.strip() for segment in category_path.split(">") if segment.strip()]
|
|
|
+ for idx in range(len(nodes)):
|
|
|
+ prefix = ">".join(nodes[: idx + 1])
|
|
|
+ node_scores[prefix] += score
|
|
|
+ if post_ids:
|
|
|
+ node_post_ids[prefix].update(post_ids)
|
|
|
+ return node_scores, node_post_ids
|
|
|
+
|
|
|
+
|
|
|
+def _write_json(path, payload):
|
|
|
+ with open(path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+
|
|
|
+def _normalize_post_id(value):
|
|
|
+ """
|
|
|
+ 统一 post_id/videoid 格式,降低源数据格式差异导致的匹配失败。
|
|
|
+ """
|
|
|
+ if value is None:
|
|
|
+ return ""
|
|
|
+ s = str(value).strip()
|
|
|
+ if not s:
|
|
|
+ return ""
|
|
|
+ if s.endswith(".0"):
|
|
|
+ s = s[:-2]
|
|
|
+ return s
|
|
|
+
|
|
|
+
|
|
|
+def _extract_digits(value: str) -> str:
|
|
|
+ if not value:
|
|
|
+ return ""
|
|
|
+ return "".join(ch for ch in value if ch.isdigit())
|
|
|
+
|
|
|
+
|
|
|
+def _build_score_by_videoid(cluster_name: str):
|
|
|
+ json_path = Path(__file__).parent / f"{cluster_name}.json"
|
|
|
+ with open(json_path, "r", encoding="utf-8") as f:
|
|
|
+ payload = json.load(f)
|
|
|
+
|
|
|
+ if not isinstance(payload, list):
|
|
|
+ raise ValueError(f"数据格式错误,期望数组: {json_path}")
|
|
|
+
|
|
|
+ score_map = {}
|
|
|
+ for item in payload:
|
|
|
+ if not isinstance(item, dict):
|
|
|
+ continue
|
|
|
+ videoid = item.get("videoid")
|
|
|
+ if videoid is None:
|
|
|
+ continue
|
|
|
+ ext_data = item.get("ext_data") or {}
|
|
|
+ if not isinstance(ext_data, dict):
|
|
|
+ continue
|
|
|
+ realplay = _safe_float(ext_data.get("推荐realplay"))
|
|
|
+ exposure = _safe_float(ext_data.get("推荐曝光数"))
|
|
|
+ norm_videoid = _normalize_post_id(videoid)
|
|
|
+ if not norm_videoid:
|
|
|
+ continue
|
|
|
+ score_map[norm_videoid] = (realplay / exposure) if exposure > 0 else 0.0
|
|
|
+ return score_map
|
|
|
+
|
|
|
+
|
|
|
+def filter_low_exposure_records(
|
|
|
+ cluster_name: str = None,
|
|
|
+ min_exposure: float = 1000,
|
|
|
+):
|
|
|
+ """
|
|
|
+ 过滤 JSON 中推荐曝光数小于阈值的记录,并写回原文件。
|
|
|
+ 默认过滤阈值: 1000
|
|
|
+ """
|
|
|
+ json_path = Path(__file__).parent / f"{cluster_name}.json"
|
|
|
+ with open(json_path, "r", encoding="utf-8") as f:
|
|
|
+ payload = json.load(f)
|
|
|
+
|
|
|
+ if not isinstance(payload, list):
|
|
|
+ raise ValueError(f"数据格式错误,期望数组: {json_path}")
|
|
|
+
|
|
|
+ filtered = []
|
|
|
+ for item in payload:
|
|
|
+ if not isinstance(item, dict):
|
|
|
+ continue
|
|
|
+ ext_data = item.get("ext_data") or {}
|
|
|
+ exposure = _safe_float(ext_data.get("推荐曝光数")) if isinstance(ext_data, dict) else 0.0
|
|
|
+ if exposure >= float(min_exposure):
|
|
|
+ filtered.append(item)
|
|
|
+
|
|
|
+ with open(json_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(filtered, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "file": str(json_path),
|
|
|
+ "before_count": len(payload),
|
|
|
+ "after_count": len(filtered),
|
|
|
+ "removed_count": len(payload) - len(filtered),
|
|
|
+ "min_exposure": float(min_exposure),
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def changwen_data_prepare(cluster_name) -> int:
|
|
|
+ json_path = Path(__file__).parent / f"{cluster_name}.json"
|
|
|
+ with open(json_path, "r", encoding="utf-8") as f:
|
|
|
+ payload = json.load(f)
|
|
|
+
|
|
|
+ if not isinstance(payload, list):
|
|
|
+ raise ValueError(f"数据格式错误,期望数组: {json_path}")
|
|
|
+
|
|
|
+ video_ids = []
|
|
|
+ for item in payload:
|
|
|
+ if not isinstance(item, dict):
|
|
|
+ continue
|
|
|
+ video_id = item.get("videoid")
|
|
|
+ if video_id is None:
|
|
|
+ continue
|
|
|
+ video_id_str = str(video_id).strip()
|
|
|
+ if video_id_str:
|
|
|
+ video_ids.append(video_id_str)
|
|
|
+
|
|
|
+ # 去重并保持原有顺序,避免重复挖掘同一视频
|
|
|
+ video_ids = list(dict.fromkeys(video_ids))
|
|
|
+ if not video_ids:
|
|
|
+ raise ValueError(f"未在文件中解析到有效 videoid: {json_path}")
|
|
|
+
|
|
|
+ execution_id = run_mining(post_ids=video_ids, cluster_name=cluster_name)
|
|
|
+ return execution_id
|
|
|
+
|
|
|
+
|
|
|
+def prepare_by_json_score(execution_id: int, cluster_name: str = "奇观妙技有乾坤"):
|
|
|
+ """
|
|
|
+ 与 prepare.py 的输出结构保持一致,但分数来源改为:
|
|
|
+ score = 推荐realplay / 推荐曝光数
|
|
|
+ """
|
|
|
+ session = db.get_session()
|
|
|
+ try:
|
|
|
+ execution = session.query(TopicPatternExecution).filter(
|
|
|
+ TopicPatternExecution.id == execution_id
|
|
|
+ ).first()
|
|
|
+ if not execution:
|
|
|
+ raise ValueError(f"execution_id 不存在: {execution_id}")
|
|
|
+
|
|
|
+ score_by_post_id = _build_score_by_videoid(cluster_name)
|
|
|
+ rows = session.query(TopicPatternElement).filter(
|
|
|
+ TopicPatternElement.execution_id == execution_id
|
|
|
+ ).all()
|
|
|
+ if not rows:
|
|
|
+ return {"message": "没有可处理的数据", "execution_id": execution_id}
|
|
|
+
|
|
|
+ grouped = {
|
|
|
+ "实质": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
|
|
|
+ "形式": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
|
|
|
+ "意图": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
|
|
|
+ }
|
|
|
+
|
|
|
+ for r in rows:
|
|
|
+ element_type = (r.element_type or "").strip()
|
|
|
+ if element_type not in grouped:
|
|
|
+ continue
|
|
|
+ name = (r.name or "").strip()
|
|
|
+ if not name:
|
|
|
+ continue
|
|
|
+ if r.post_id:
|
|
|
+ grouped[element_type]["name_post_ids"][name].add(str(r.post_id))
|
|
|
+ if r.category_path:
|
|
|
+ grouped[element_type]["name_paths"][name].add(r.category_path.strip())
|
|
|
+
|
|
|
+ output_dir = Path(__file__).parent / "data" / str(execution_id)
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ match_stats = {
|
|
|
+ "post_ids_total": 0,
|
|
|
+ "post_ids_scored_direct": 0,
|
|
|
+ "post_ids_scored_by_digits": 0,
|
|
|
+ "post_ids_missing_score": 0,
|
|
|
+ }
|
|
|
+ summary = {
|
|
|
+ "execution_id": execution_id,
|
|
|
+ "merge_leve2": execution.merge_leve2,
|
|
|
+ "files": {},
|
|
|
+ "score_match_stats": match_stats,
|
|
|
+ }
|
|
|
+
|
|
|
+ for element_type, data in grouped.items():
|
|
|
+ name_post_ids = data["name_post_ids"]
|
|
|
+ name_paths = data["name_paths"]
|
|
|
+
|
|
|
+ name_scores = {}
|
|
|
+ for name, post_ids in name_post_ids.items():
|
|
|
+ scores = []
|
|
|
+ for raw_pid in post_ids:
|
|
|
+ match_stats["post_ids_total"] += 1
|
|
|
+ pid = _normalize_post_id(raw_pid)
|
|
|
+ score = score_by_post_id.get(pid)
|
|
|
+ if score is not None:
|
|
|
+ match_stats["post_ids_scored_direct"] += 1
|
|
|
+ scores.append(_safe_float(score))
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 兜底:当 post_id 含前后缀时,尝试仅用数字部分匹配 videoid
|
|
|
+ digits_pid = _extract_digits(pid)
|
|
|
+ if digits_pid and digits_pid in score_by_post_id:
|
|
|
+ match_stats["post_ids_scored_by_digits"] += 1
|
|
|
+ scores.append(_safe_float(score_by_post_id[digits_pid]))
|
|
|
+ else:
|
|
|
+ match_stats["post_ids_missing_score"] += 1
|
|
|
+ scores.append(0.0)
|
|
|
+ name_scores[name] = (sum(scores) / len(scores)) if scores else 0.0
|
|
|
+
|
|
|
+ raw_elements = []
|
|
|
+ for name, score in name_scores.items():
|
|
|
+ post_ids_set = name_post_ids.get(name, set())
|
|
|
+ raw_elements.append(
|
|
|
+ {
|
|
|
+ "name": name,
|
|
|
+ "score": round(score, 6),
|
|
|
+ "post_ids_count": len(post_ids_set),
|
|
|
+ "category_paths": sorted(list(name_paths.get(name, set()))),
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ element_payload = sorted(raw_elements, key=lambda x: (-x["score"], x["name"]))
|
|
|
+ category_scores, category_post_ids = _build_category_scores(
|
|
|
+ name_scores, name_paths, name_post_ids
|
|
|
+ )
|
|
|
+ category_payload = sorted(
|
|
|
+ [
|
|
|
+ {
|
|
|
+ "category_path": path,
|
|
|
+ "category": path.split(">")[-1].strip() if path else "",
|
|
|
+ "score": round(score, 6),
|
|
|
+ "post_ids_count": len(category_post_ids.get(path, set())),
|
|
|
+ }
|
|
|
+ for path, score in category_scores.items()
|
|
|
+ ],
|
|
|
+ key=lambda x: x["score"],
|
|
|
+ reverse=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ element_file = output_dir / f"{element_type}_元素.json"
|
|
|
+ category_file = output_dir / f"{element_type}_分类.json"
|
|
|
+ _write_json(element_file, element_payload)
|
|
|
+ _write_json(category_file, category_payload)
|
|
|
+ summary["files"][f"{element_type}_元素"] = str(element_file)
|
|
|
+ summary["files"][f"{element_type}_分类"] = str(category_file)
|
|
|
+
|
|
|
+ return summary
|
|
|
+ finally:
|
|
|
+ session.close()
|
|
|
+
|
|
|
+def changwen_prepare(cluster_name):
|
|
|
+ get_changwen_weight(cluster_name)
|
|
|
+ filter_low_exposure_records(cluster_name=cluster_name)
|
|
|
+ execution_id = changwen_data_prepare(cluster_name)
|
|
|
+ print(f"execution_id={execution_id}")
|
|
|
+ print(prepare_by_json_score(execution_id, cluster_name))
|
|
|
+ return execution_id
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ cluster_name = '小阳看天下'
|
|
|
+ execution_id = changwen_prepare(cluster_name=cluster_name)
|
|
|
+ print(execution_id)
|
|
|
+
|