import json from collections import defaultdict from pathlib import Path from examples.piaoquan_demand.data_query_tools import get_rov_by_merge_leve2_and_video_ids from examples.piaoquan_demand.db_manager import DatabaseManager from examples.piaoquan_demand.models import TopicPatternElement, TopicPatternExecution db = DatabaseManager() def _safe_float(value): if value is None: return 0.0 try: return float(value) except (TypeError, ValueError): return 0.0 def _build_category_scores(name_scores, name_paths, name_post_ids): """ 计算分类路径节点权重: - 一个 name 的 score 贡献给其路径上的每个节点 - 同一个 name 多条路径时,每条路径都累加 """ node_scores = defaultdict(float) node_post_ids = defaultdict(set) for name, score in name_scores.items(): paths = name_paths.get(name, set()) post_ids = name_post_ids.get(name, set()) for category_path in paths: if not category_path: continue nodes = [segment.strip() for segment in category_path.split(">") if segment.strip()] for idx in range(len(nodes)): prefix = ">".join(nodes[: idx + 1]) node_scores[prefix] += score if post_ids: node_post_ids[prefix].update(post_ids) return node_scores, node_post_ids def _write_json(path, payload): with open(path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) def prepare(execution_id): session = db.get_session() try: execution = session.query(TopicPatternExecution).filter( TopicPatternExecution.id == execution_id ).first() if not execution: raise ValueError(f"execution_id 不存在: {execution_id}") merge_leve2 = execution.merge_leve2 rows = session.query(TopicPatternElement).filter( TopicPatternElement.execution_id == execution_id ).all() if not rows: return {"message": "没有可处理的数据", "execution_id": execution_id} # 1) 去重 post_id 拉取 ROV all_post_ids = sorted({r.post_id for r in rows if r.post_id}) rov_by_post_id = get_rov_by_merge_leve2_and_video_ids(merge_leve2, all_post_ids) if all_post_ids else {} # 2) 按 element_type 分组,计算 name 的平均 ROV 分 grouped = { "实质": { "name_post_ids": defaultdict(set), "name_paths": defaultdict(set), }, "形式": { "name_post_ids": defaultdict(set), "name_paths": defaultdict(set), }, "意图": { "name_post_ids": defaultdict(set), "name_paths": defaultdict(set), }, } for r in rows: element_type = (r.element_type or "").strip() if element_type not in grouped: continue name = (r.name or "").strip() if not name: continue if r.post_id: grouped[element_type]["name_post_ids"][name].add(r.post_id) if r.category_path: grouped[element_type]["name_paths"][name].add(r.category_path.strip()) output_dir = Path(__file__).parent / "data" / str(execution_id) output_dir.mkdir(parents=True, exist_ok=True) summary = {"execution_id": execution_id, "merge_leve2": merge_leve2, "files": {}} for element_type, data in grouped.items(): name_post_ids = data["name_post_ids"] name_paths = data["name_paths"] name_scores = {} for name, post_ids in name_post_ids.items(): rovs = [_safe_float(rov_by_post_id.get(pid, 0.0)) for pid in post_ids] score = sum(rovs) / len(rovs) if rovs else 0.0 name_scores[name] = score raw_elements = [] for name, score in name_scores.items(): post_ids_set = name_post_ids.get(name, set()) raw_elements.append( { "name": name, "score": round(score, 6), # 不在结果文件里输出帖子 ID 明细,避免体积过大/泄露。 "post_ids_count": len(post_ids_set), "category_paths": sorted(list(name_paths.get(name, set()))), } ) # 通过(score, name)确保排序稳定,进而生成可重复的 id。 element_payload = sorted( raw_elements, key=lambda x: (-x["score"], x["name"]), ) # 3) 计算分类路径节点权重(节点分 = 覆盖的 name score 求和) category_scores, category_post_ids = _build_category_scores( name_scores, name_paths, name_post_ids ) category_payload = sorted( [ { "category_path": path, "category": path.split(">")[-1].strip() if path else "", "score": round(score, 6), "post_ids_count": len(category_post_ids.get(path, set())), } for path, score in category_scores.items() ], key=lambda x: x["score"], reverse=True, ) element_file = output_dir / f"{element_type}_元素.json" category_file = output_dir / f"{element_type}_分类.json" _write_json(element_file, element_payload) _write_json(category_file, category_payload) summary["files"][f"{element_type}_元素"] = str(element_file) summary["files"][f"{element_type}_分类"] = str(category_file) return summary finally: session.close() if __name__ == '__main__': prepare(29)