| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import json
- from collections import defaultdict
- from pathlib import Path
- from examples.piaoquan_demand.data_query_tools import get_rov_by_merge_leve2_and_video_ids
- from examples.piaoquan_demand.db_manager import DatabaseManager
- from examples.piaoquan_demand.models import TopicPatternElement, TopicPatternExecution
- db = DatabaseManager()
- def _safe_float(value):
- if value is None:
- return 0.0
- try:
- return float(value)
- except (TypeError, ValueError):
- return 0.0
- def _build_category_scores(name_scores, name_paths, name_post_ids):
- """
- 计算分类路径节点权重:
- - 一个 name 的 score 贡献给其路径上的每个节点
- - 同一个 name 多条路径时,每条路径都累加
- """
- node_scores = defaultdict(float)
- node_post_ids = defaultdict(set)
- for name, score in name_scores.items():
- paths = name_paths.get(name, set())
- post_ids = name_post_ids.get(name, set())
- for category_path in paths:
- if not category_path:
- continue
- nodes = [segment.strip() for segment in category_path.split(">") if segment.strip()]
- for idx in range(len(nodes)):
- prefix = ">".join(nodes[: idx + 1])
- node_scores[prefix] += score
- if post_ids:
- node_post_ids[prefix].update(post_ids)
- return node_scores, node_post_ids
- def _write_json(path, payload):
- with open(path, "w", encoding="utf-8") as f:
- json.dump(payload, f, ensure_ascii=False, indent=2)
- def prepare(execution_id):
- session = db.get_session()
- try:
- execution = session.query(TopicPatternExecution).filter(
- TopicPatternExecution.id == execution_id
- ).first()
- if not execution:
- raise ValueError(f"execution_id 不存在: {execution_id}")
- merge_leve2 = execution.merge_leve2
- rows = session.query(TopicPatternElement).filter(
- TopicPatternElement.execution_id == execution_id
- ).all()
- if not rows:
- return {"message": "没有可处理的数据", "execution_id": execution_id}
- # 1) 去重 post_id 拉取 ROV
- all_post_ids = sorted({r.post_id for r in rows if r.post_id})
- rov_by_post_id = get_rov_by_merge_leve2_and_video_ids(merge_leve2, all_post_ids) if all_post_ids else {}
- # 2) 按 element_type 分组,计算 name 的平均 ROV 分
- grouped = {
- "实质": {
- "name_post_ids": defaultdict(set),
- "name_paths": defaultdict(set),
- },
- "形式": {
- "name_post_ids": defaultdict(set),
- "name_paths": defaultdict(set),
- },
- "意图": {
- "name_post_ids": defaultdict(set),
- "name_paths": defaultdict(set),
- },
- }
- for r in rows:
- element_type = (r.element_type or "").strip()
- if element_type not in grouped:
- continue
- name = (r.name or "").strip()
- if not name:
- continue
- if r.post_id:
- grouped[element_type]["name_post_ids"][name].add(r.post_id)
- if r.category_path:
- grouped[element_type]["name_paths"][name].add(r.category_path.strip())
- output_dir = Path(__file__).parent / "data" / str(execution_id)
- output_dir.mkdir(parents=True, exist_ok=True)
- summary = {"execution_id": execution_id, "merge_leve2": merge_leve2, "files": {}}
- for element_type, data in grouped.items():
- name_post_ids = data["name_post_ids"]
- name_paths = data["name_paths"]
- name_scores = {}
- for name, post_ids in name_post_ids.items():
- rovs = [_safe_float(rov_by_post_id.get(pid, 0.0)) for pid in post_ids]
- score = sum(rovs) / len(rovs) if rovs else 0.0
- name_scores[name] = score
- raw_elements = []
- for name, score in name_scores.items():
- post_ids_set = name_post_ids.get(name, set())
- raw_elements.append(
- {
- "name": name,
- "score": round(score, 6),
- # 不在结果文件里输出帖子 ID 明细,避免体积过大/泄露。
- "post_ids_count": len(post_ids_set),
- "category_paths": sorted(list(name_paths.get(name, set()))),
- }
- )
- # 通过(score, name)确保排序稳定,进而生成可重复的 id。
- element_payload = sorted(
- raw_elements,
- key=lambda x: (-x["score"], x["name"]),
- )
- # 3) 计算分类路径节点权重(节点分 = 覆盖的 name score 求和)
- category_scores, category_post_ids = _build_category_scores(
- name_scores, name_paths, name_post_ids
- )
- category_payload = sorted(
- [
- {
- "category_path": path,
- "category": path.split(">")[-1].strip() if path else "",
- "score": round(score, 6),
- "post_ids_count": len(category_post_ids.get(path, set())),
- }
- for path, score in category_scores.items()
- ],
- key=lambda x: x["score"],
- reverse=True,
- )
- element_file = output_dir / f"{element_type}_元素.json"
- category_file = output_dir / f"{element_type}_分类.json"
- _write_json(element_file, element_payload)
- _write_json(category_file, category_payload)
- summary["files"][f"{element_type}_元素"] = str(element_file)
- summary["files"][f"{element_type}_分类"] = str(category_file)
- return summary
- finally:
- session.close()
- if __name__ == '__main__':
- prepare(29)
|