import json from collections import defaultdict from pathlib import Path from examples.demand.data_query_tools import get_changwen_weight, get_zengzhang_weight from examples.demand.db_manager import DatabaseManager from examples.demand.models import TopicPatternElement, TopicPatternExecution from examples.demand.pattern_builds.pattern_service import run_mining db = DatabaseManager() ZENGZHANG_DATA_DIR = Path(__file__).parent / "data" / "zengzhang_data" def _safe_float(value): if value is None: return 0.0 try: return float(value) except (TypeError, ValueError): return 0.0 def _build_category_scores(name_scores, name_paths, name_post_ids): node_scores = defaultdict(float) node_post_ids = defaultdict(set) for name, score in name_scores.items(): paths = name_paths.get(name, set()) post_ids = name_post_ids.get(name, set()) for category_path in paths: if not category_path: continue nodes = [segment.strip() for segment in category_path.split(">") if segment.strip()] for idx in range(len(nodes)): prefix = ">".join(nodes[: idx + 1]) node_scores[prefix] += score if post_ids: node_post_ids[prefix].update(post_ids) return node_scores, node_post_ids def _write_json(path, payload): with open(path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) def _normalize_post_id(value): """ 统一 post_id/videoid 格式,降低源数据格式差异导致的匹配失败。 """ if value is None: return "" s = str(value).strip() if not s: return "" if s.endswith(".0"): s = s[:-2] return s def _extract_digits(value: str) -> str: if not value: return "" return "".join(ch for ch in value if ch.isdigit()) def _build_score_by_videoid(cluster_name: str): json_path = ZENGZHANG_DATA_DIR / f"{cluster_name}.json" with open(json_path, "r", encoding="utf-8") as f: payload = json.load(f) if not isinstance(payload, list): raise ValueError(f"数据格式错误,期望数组: {json_path}") score_map = {} for item in payload: if not isinstance(item, dict): continue videoid = item.get("videoid") if videoid is None: continue ext_data = item.get("ext_data") or {} if not isinstance(ext_data, dict): continue realplay_uv = _safe_float(ext_data.get("分发realplay_uv")) exposure_uv = _safe_float(ext_data.get("当日分发曝光uv")) norm_videoid = _normalize_post_id(videoid) if not norm_videoid: continue score_map[norm_videoid] = (realplay_uv / exposure_uv) if exposure_uv > 0 else 0.0 return score_map def filter_low_exposure_records( cluster_name: str = None, min_exposure: float = 1000, ): """ 过滤 JSON 中推荐曝光数小于阈值的记录,并写回原文件。 默认过滤阈值: 1000 """ json_path = ZENGZHANG_DATA_DIR / f"{cluster_name}.json" with open(json_path, "r", encoding="utf-8") as f: payload = json.load(f) if not isinstance(payload, list): raise ValueError(f"数据格式错误,期望数组: {json_path}") filtered = [] for item in payload: if not isinstance(item, dict): continue ext_data = item.get("ext_data") or {} exposure = _safe_float(ext_data.get("推荐曝光数")) if isinstance(ext_data, dict) else 0.0 if exposure >= float(min_exposure): filtered.append(item) with open(json_path, "w", encoding="utf-8") as f: json.dump(filtered, f, ensure_ascii=False, indent=2) return { "file": str(json_path), "before_count": len(payload), "after_count": len(filtered), "removed_count": len(payload) - len(filtered), "min_exposure": float(min_exposure), } def zengzhang_data_prepare(cluster_name) -> int: json_path = ZENGZHANG_DATA_DIR / f"{cluster_name}.json" with open(json_path, "r", encoding="utf-8") as f: payload = json.load(f) if not isinstance(payload, list): raise ValueError(f"数据格式错误,期望数组: {json_path}") video_ids = [] for item in payload: if not isinstance(item, dict): continue video_id = item.get("videoid") if video_id is None: continue video_id_str = str(video_id).strip() if video_id_str: video_ids.append(video_id_str) # 去重并保持原有顺序,避免重复挖掘同一视频 video_ids = list(dict.fromkeys(video_ids)) if not video_ids: raise ValueError(f"未在文件中解析到有效 videoid: {json_path}") execution_id = run_mining(post_ids=video_ids, cluster_name=cluster_name, platform='zengzhang') return execution_id def prepare_by_json_score(execution_id: int, cluster_name: str = ""): """ 与 prepare.py 的输出结构保持一致,但分数来源改为: score = score = 分发realplay_uv / 当日分发曝光uv """ session = db.get_session() try: execution = session.query(TopicPatternExecution).filter( TopicPatternExecution.id == execution_id ).first() if not execution: raise ValueError(f"execution_id 不存在: {execution_id}") score_by_post_id = _build_score_by_videoid(cluster_name) rows = session.query(TopicPatternElement).filter( TopicPatternElement.execution_id == execution_id ).all() if not rows: return {"message": "没有可处理的数据", "execution_id": execution_id} grouped = { "实质": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)}, "形式": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)}, "意图": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)}, } for r in rows: element_type = (r.element_type or "").strip() if element_type not in grouped: continue name = (r.name or "").strip() if not name: continue if r.post_id: grouped[element_type]["name_post_ids"][name].add(str(r.post_id)) if r.category_path: grouped[element_type]["name_paths"][name].add(r.category_path.strip()) output_dir = Path(__file__).parent / "data" / str(execution_id) output_dir.mkdir(parents=True, exist_ok=True) match_stats = { "post_ids_total": 0, "post_ids_scored_direct": 0, "post_ids_scored_by_digits": 0, "post_ids_missing_score": 0, } summary = { "execution_id": execution_id, "merge_leve2": execution.merge_leve2, "files": {}, "score_match_stats": match_stats, } for element_type, data in grouped.items(): name_post_ids = data["name_post_ids"] name_paths = data["name_paths"] name_scores = {} for name, post_ids in name_post_ids.items(): scores = [] for raw_pid in post_ids: match_stats["post_ids_total"] += 1 pid = _normalize_post_id(raw_pid) score = score_by_post_id.get(pid) if score is not None: match_stats["post_ids_scored_direct"] += 1 scores.append(_safe_float(score)) continue # 兜底:当 post_id 含前后缀时,尝试仅用数字部分匹配 videoid digits_pid = _extract_digits(pid) if digits_pid and digits_pid in score_by_post_id: match_stats["post_ids_scored_by_digits"] += 1 scores.append(_safe_float(score_by_post_id[digits_pid])) else: match_stats["post_ids_missing_score"] += 1 scores.append(0.0) name_scores[name] = (sum(scores) / len(scores)) if scores else 0.0 raw_elements = [] for name, score in name_scores.items(): post_ids_set = name_post_ids.get(name, set()) raw_elements.append( { "name": name, "score": round(score, 6), "post_ids_count": len(post_ids_set), "category_paths": sorted(list(name_paths.get(name, set()))), } ) element_payload = sorted(raw_elements, key=lambda x: (-x["score"], x["name"])) category_scores, category_post_ids = _build_category_scores( name_scores, name_paths, name_post_ids ) category_payload = sorted( [ { "category_path": path, "category": path.split(">")[-1].strip() if path else "", "score": round(score, 6), "post_ids_count": len(category_post_ids.get(path, set())), } for path, score in category_scores.items() ], key=lambda x: x["score"], reverse=True, ) element_file = output_dir / f"{element_type}_元素.json" category_file = output_dir / f"{element_type}_分类.json" _write_json(element_file, element_payload) _write_json(category_file, category_payload) summary["files"][f"{element_type}_元素"] = str(element_file) summary["files"][f"{element_type}_分类"] = str(category_file) return summary finally: session.close() def zengzhang_prepare(cluster_name): get_zengzhang_weight(cluster_name) filter_low_exposure_records(cluster_name=cluster_name) execution_id = zengzhang_data_prepare(cluster_name) if execution_id: print(f"execution_id={execution_id}") print(prepare_by_json_score(execution_id, cluster_name)) return execution_id if __name__ == "__main__": # 奇观妙技有乾坤 青史铁事漫谈 cluster_name = '新余' execution_id = zengzhang_prepare(cluster_name=cluster_name) print(execution_id)