| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- """垂直领域分类树:查询分类/元素基础信息与效果得分。"""
- import re
- from sqlalchemy import text
- from app.core.config import settings
- from app.db.mysql import SessionLocal
- IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
- DATE_RE = re.compile(r"^\d{8}$")
- def _safe_identifier(name: str) -> str:
- if not IDENTIFIER_RE.match(name):
- raise ValueError(f"invalid sql identifier: {name}")
- return name
- def _normalize_date(date_value: str | None) -> str | None:
- if not date_value:
- return None
- normalized = date_value.replace("-", "").strip()
- if not normalized:
- return None
- if not DATE_RE.match(normalized):
- raise ValueError("date must be yyyymmdd or yyyy-mm-dd")
- return normalized
- def _resolve_partition_dt(dt: str | None) -> str:
- normalized = _normalize_date(dt)
- if normalized:
- return normalized
- category_effect_table = _safe_identifier(settings.vertical_category_effect_table)
- with SessionLocal() as session:
- row = session.execute(
- text(f"SELECT MAX(dt) FROM {category_effect_table}")
- ).first()
- latest = str(row[0] or "").strip() if row else ""
- if not latest:
- raise ValueError("暂无效果数据分区,请先同步垂直领域分类数据")
- return latest
- def query_available_dates() -> list[str]:
- category_effect_table = _safe_identifier(settings.vertical_category_effect_table)
- with SessionLocal() as session:
- rows = session.execute(
- text(
- f"""
- SELECT DISTINCT dt
- FROM {category_effect_table}
- ORDER BY dt DESC
- LIMIT 366
- """
- )
- ).all()
- return [str(row[0]) for row in rows if row[0]]
- def query_vertical_category_tree(dt: str | None = None) -> dict[str, object]:
- partition_dt = _resolve_partition_dt(dt)
- category_base_table = _safe_identifier(settings.vertical_category_base_table)
- category_effect_table = _safe_identifier(settings.vertical_category_effect_table)
- element_base_table = _safe_identifier(settings.substance_element_base_table)
- element_effect_table = _safe_identifier(settings.substance_element_effect_table)
- category_sql = text(
- f"""
- SELECT
- b.category_id,
- b.parent_stable_id,
- b.category_name,
- b.category_level,
- b.dimension,
- b.classified_as,
- e.vid_count,
- e.rov_score,
- e.str_score,
- e.ros_score
- FROM {category_base_table} b
- LEFT JOIN {category_effect_table} e
- ON b.category_id = e.category_id
- AND e.dt = :dt
- ORDER BY b.category_level ASC, b.category_id ASC
- """
- )
- element_sql = text(
- f"""
- SELECT
- b.element_id,
- b.stable_id,
- b.element_name,
- b.dimension,
- b.classified_as,
- e.vid_count,
- e.rov_score,
- e.str_score,
- e.ros_score
- FROM {element_base_table} b
- LEFT JOIN {element_effect_table} e
- ON b.element_id = e.element_id
- AND e.dt = :dt
- ORDER BY b.stable_id ASC, b.element_id ASC
- """
- )
- with SessionLocal() as session:
- category_rows = session.execute(category_sql, {"dt": partition_dt}).mappings().all()
- element_rows = session.execute(element_sql, {"dt": partition_dt}).mappings().all()
- categories: list[dict[str, object]] = []
- child_category_ids: set[str] = set()
- category_rov_values: list[float] = []
- for row in category_rows:
- category_id = str(row["category_id"] or "").strip()
- parent_id = str(row["parent_stable_id"] or "").strip() or None
- if parent_id:
- child_category_ids.add(parent_id)
- rov_score = row["rov_score"]
- if rov_score is not None and float(rov_score) != 0.0:
- category_rov_values.append(float(rov_score))
- categories.append(
- {
- "category_id": category_id,
- "parent_stable_id": parent_id,
- "category_name": row["category_name"],
- "category_level": row["category_level"],
- "dimension": row["dimension"],
- "classified_as": row["classified_as"],
- "vid_count": row["vid_count"],
- "rov_score": float(rov_score) if rov_score is not None else None,
- "str_score": float(row["str_score"]) if row["str_score"] is not None else None,
- "ros_score": float(row["ros_score"]) if row["ros_score"] is not None else None,
- }
- )
- for item in categories:
- item["is_leaf"] = item["category_id"] not in child_category_ids
- elements: list[dict[str, object]] = []
- element_rov_values: list[float] = []
- for row in element_rows:
- rov_score = row["rov_score"]
- if rov_score is not None and float(rov_score) != 0.0:
- element_rov_values.append(float(rov_score))
- elements.append(
- {
- "element_id": str(row["element_id"] or "").strip(),
- "stable_id": str(row["stable_id"] or "").strip() or None,
- "element_name": row["element_name"],
- "dimension": row["dimension"],
- "classified_as": row["classified_as"],
- "vid_count": row["vid_count"],
- "rov_score": float(rov_score) if rov_score is not None else None,
- "str_score": float(row["str_score"]) if row["str_score"] is not None else None,
- "ros_score": float(row["ros_score"]) if row["ros_score"] is not None else None,
- }
- )
- category_min_rov = min(category_rov_values) if category_rov_values else 0.0
- category_max_rov = max(category_rov_values) if category_rov_values else 0.0
- element_min_rov = min(element_rov_values) if element_rov_values else 0.0
- element_max_rov = max(element_rov_values) if element_rov_values else 0.0
- return {
- "dt": partition_dt,
- "available_dates": query_available_dates(),
- "category_min_rov_score": category_min_rov,
- "category_max_rov_score": category_max_rov,
- "element_min_rov_score": element_min_rov,
- "element_max_rov_score": element_max_rov,
- "categories": categories,
- "elements": elements,
- }
|