| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- import json
- from collections import defaultdict
- from pathlib import Path
- from examples.demand.data_query_tools import get_changwen_weight
- from examples.demand.db_manager import DatabaseManager
- from examples.demand.models import TopicPatternElement, TopicPatternExecution
- from examples.demand.pattern_builds.pattern_service import run_mining
- db = DatabaseManager()
- CHANGWEN_DATA_DIR = Path(__file__).parent / "data" / "changwen_data"
- def _safe_float(value):
- if value is None:
- return 0.0
- try:
- return float(value)
- except (TypeError, ValueError):
- return 0.0
- def _build_category_scores(name_scores, name_paths, name_post_ids):
- node_scores = defaultdict(float)
- node_post_ids = defaultdict(set)
- for name, score in name_scores.items():
- paths = name_paths.get(name, set())
- post_ids = name_post_ids.get(name, set())
- for category_path in paths:
- if not category_path:
- continue
- nodes = [segment.strip() for segment in category_path.split(">") if segment.strip()]
- for idx in range(len(nodes)):
- prefix = ">".join(nodes[: idx + 1])
- node_scores[prefix] += score
- if post_ids:
- node_post_ids[prefix].update(post_ids)
- return node_scores, node_post_ids
- def _write_json(path, payload):
- with open(path, "w", encoding="utf-8") as f:
- json.dump(payload, f, ensure_ascii=False, indent=2)
- def _normalize_post_id(value):
- """
- 统一 post_id/videoid 格式,降低源数据格式差异导致的匹配失败。
- """
- if value is None:
- return ""
- s = str(value).strip()
- if not s:
- return ""
- if s.endswith(".0"):
- s = s[:-2]
- return s
- def _extract_digits(value: str) -> str:
- if not value:
- return ""
- return "".join(ch for ch in value if ch.isdigit())
- def _build_score_by_videoid(cluster_name: str):
- json_path = CHANGWEN_DATA_DIR / f"{cluster_name}.json"
- with open(json_path, "r", encoding="utf-8") as f:
- payload = json.load(f)
- if not isinstance(payload, list):
- raise ValueError(f"数据格式错误,期望数组: {json_path}")
- score_map = {}
- for item in payload:
- if not isinstance(item, dict):
- continue
- videoid = item.get("videoid")
- if videoid is None:
- continue
- ext_data = item.get("ext_data") or {}
- if not isinstance(ext_data, dict):
- continue
- realplay = _safe_float(ext_data.get("推荐realplay"))
- exposure = _safe_float(ext_data.get("推荐曝光数"))
- norm_videoid = _normalize_post_id(videoid)
- if not norm_videoid:
- continue
- score_map[norm_videoid] = (realplay / exposure) if exposure > 0 else 0.0
- return score_map
- def filter_low_exposure_records(
- cluster_name: str = None,
- min_exposure: float = 1000,
- ):
- """
- 过滤 JSON 中推荐曝光数小于阈值的记录,并写回原文件。
- 默认过滤阈值: 1000
- """
- json_path = CHANGWEN_DATA_DIR / f"{cluster_name}.json"
- with open(json_path, "r", encoding="utf-8") as f:
- payload = json.load(f)
- if not isinstance(payload, list):
- raise ValueError(f"数据格式错误,期望数组: {json_path}")
- filtered = []
- for item in payload:
- if not isinstance(item, dict):
- continue
- ext_data = item.get("ext_data") or {}
- exposure = _safe_float(ext_data.get("推荐曝光数")) if isinstance(ext_data, dict) else 0.0
- if exposure >= float(min_exposure):
- filtered.append(item)
- with open(json_path, "w", encoding="utf-8") as f:
- json.dump(filtered, f, ensure_ascii=False, indent=2)
- return {
- "file": str(json_path),
- "before_count": len(payload),
- "after_count": len(filtered),
- "removed_count": len(payload) - len(filtered),
- "min_exposure": float(min_exposure),
- }
- def changwen_data_prepare(cluster_name) -> int:
- json_path = CHANGWEN_DATA_DIR / f"{cluster_name}.json"
- with open(json_path, "r", encoding="utf-8") as f:
- payload = json.load(f)
- if not isinstance(payload, list):
- raise ValueError(f"数据格式错误,期望数组: {json_path}")
- video_ids = []
- for item in payload:
- if not isinstance(item, dict):
- continue
- video_id = item.get("videoid")
- if video_id is None:
- continue
- video_id_str = str(video_id).strip()
- if video_id_str:
- video_ids.append(video_id_str)
- # 去重并保持原有顺序,避免重复挖掘同一视频
- video_ids = list(dict.fromkeys(video_ids))
- if not video_ids:
- raise ValueError(f"未在文件中解析到有效 videoid: {json_path}")
- execution_id = run_mining(post_ids=video_ids, cluster_name=cluster_name)
- return execution_id
- def prepare_by_json_score(execution_id: int, cluster_name: str = "奇观妙技有乾坤"):
- """
- 与 prepare.py 的输出结构保持一致,但分数来源改为:
- score = 推荐realplay / 推荐曝光数
- """
- session = db.get_session()
- try:
- execution = session.query(TopicPatternExecution).filter(
- TopicPatternExecution.id == execution_id
- ).first()
- if not execution:
- raise ValueError(f"execution_id 不存在: {execution_id}")
- score_by_post_id = _build_score_by_videoid(cluster_name)
- rows = session.query(TopicPatternElement).filter(
- TopicPatternElement.execution_id == execution_id
- ).all()
- if not rows:
- return {"message": "没有可处理的数据", "execution_id": execution_id}
- grouped = {
- "实质": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
- "形式": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
- "意图": {"name_post_ids": defaultdict(set), "name_paths": defaultdict(set)},
- }
- for r in rows:
- element_type = (r.element_type or "").strip()
- if element_type not in grouped:
- continue
- name = (r.name or "").strip()
- if not name:
- continue
- if r.post_id:
- grouped[element_type]["name_post_ids"][name].add(str(r.post_id))
- if r.category_path:
- grouped[element_type]["name_paths"][name].add(r.category_path.strip())
- output_dir = Path(__file__).parent / "data" / str(execution_id)
- output_dir.mkdir(parents=True, exist_ok=True)
- match_stats = {
- "post_ids_total": 0,
- "post_ids_scored_direct": 0,
- "post_ids_scored_by_digits": 0,
- "post_ids_missing_score": 0,
- }
- summary = {
- "execution_id": execution_id,
- "merge_leve2": execution.merge_leve2,
- "files": {},
- "score_match_stats": match_stats,
- }
- for element_type, data in grouped.items():
- name_post_ids = data["name_post_ids"]
- name_paths = data["name_paths"]
- name_scores = {}
- for name, post_ids in name_post_ids.items():
- scores = []
- for raw_pid in post_ids:
- match_stats["post_ids_total"] += 1
- pid = _normalize_post_id(raw_pid)
- score = score_by_post_id.get(pid)
- if score is not None:
- match_stats["post_ids_scored_direct"] += 1
- scores.append(_safe_float(score))
- continue
- # 兜底:当 post_id 含前后缀时,尝试仅用数字部分匹配 videoid
- digits_pid = _extract_digits(pid)
- if digits_pid and digits_pid in score_by_post_id:
- match_stats["post_ids_scored_by_digits"] += 1
- scores.append(_safe_float(score_by_post_id[digits_pid]))
- else:
- match_stats["post_ids_missing_score"] += 1
- scores.append(0.0)
- name_scores[name] = (sum(scores) / len(scores)) if scores else 0.0
- raw_elements = []
- for name, score in name_scores.items():
- post_ids_set = name_post_ids.get(name, set())
- raw_elements.append(
- {
- "name": name,
- "score": round(score, 6),
- "post_ids_count": len(post_ids_set),
- "category_paths": sorted(list(name_paths.get(name, set()))),
- }
- )
- element_payload = sorted(raw_elements, key=lambda x: (-x["score"], x["name"]))
- category_scores, category_post_ids = _build_category_scores(
- name_scores, name_paths, name_post_ids
- )
- category_payload = sorted(
- [
- {
- "category_path": path,
- "category": path.split(">")[-1].strip() if path else "",
- "score": round(score, 6),
- "post_ids_count": len(category_post_ids.get(path, set())),
- }
- for path, score in category_scores.items()
- ],
- key=lambda x: x["score"],
- reverse=True,
- )
- element_file = output_dir / f"{element_type}_元素.json"
- category_file = output_dir / f"{element_type}_分类.json"
- _write_json(element_file, element_payload)
- _write_json(category_file, category_payload)
- summary["files"][f"{element_type}_元素"] = str(element_file)
- summary["files"][f"{element_type}_分类"] = str(category_file)
- return summary
- finally:
- session.close()
- def changwen_prepare(cluster_name):
- get_changwen_weight(cluster_name)
- filter_low_exposure_records(cluster_name=cluster_name)
- execution_id = changwen_data_prepare(cluster_name)
- print(f"execution_id={execution_id}")
- print(prepare_by_json_score(execution_id, cluster_name))
- return execution_id
- if __name__ == "__main__":
- cluster_name = '小阳看天下'
- execution_id = changwen_prepare(cluster_name=cluster_name)
- print(execution_id)
|