"""垂直领域分类树:查询分类/元素基础信息与效果得分。""" import re from sqlalchemy import text from app.core.config import settings from app.db.mysql import SessionLocal IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") DATE_RE = re.compile(r"^\d{8}$") def _safe_identifier(name: str) -> str: if not IDENTIFIER_RE.match(name): raise ValueError(f"invalid sql identifier: {name}") return name def _normalize_date(date_value: str | None) -> str | None: if not date_value: return None normalized = date_value.replace("-", "").strip() if not normalized: return None if not DATE_RE.match(normalized): raise ValueError("date must be yyyymmdd or yyyy-mm-dd") return normalized def _resolve_partition_dt(dt: str | None) -> str: normalized = _normalize_date(dt) if normalized: return normalized category_effect_table = _safe_identifier(settings.vertical_category_effect_table) with SessionLocal() as session: row = session.execute( text(f"SELECT MAX(dt) FROM {category_effect_table}") ).first() latest = str(row[0] or "").strip() if row else "" if not latest: raise ValueError("暂无效果数据分区,请先同步垂直领域分类数据") return latest def query_available_dates() -> list[str]: category_effect_table = _safe_identifier(settings.vertical_category_effect_table) with SessionLocal() as session: rows = session.execute( text( f""" SELECT DISTINCT dt FROM {category_effect_table} ORDER BY dt DESC LIMIT 366 """ ) ).all() return [str(row[0]) for row in rows if row[0]] def query_vertical_category_tree(dt: str | None = None) -> dict[str, object]: partition_dt = _resolve_partition_dt(dt) category_base_table = _safe_identifier(settings.vertical_category_base_table) category_effect_table = _safe_identifier(settings.vertical_category_effect_table) element_base_table = _safe_identifier(settings.substance_element_base_table) element_effect_table = _safe_identifier(settings.substance_element_effect_table) category_sql = text( f""" SELECT b.category_id, b.parent_stable_id, b.category_name, b.category_level, b.dimension, b.classified_as, e.vid_count, e.rov_score, e.str_score, e.ros_score FROM {category_base_table} b LEFT JOIN {category_effect_table} e ON b.category_id = e.category_id AND e.dt = :dt ORDER BY b.category_level ASC, b.category_id ASC """ ) element_sql = text( f""" SELECT b.element_id, b.stable_id, b.element_name, b.dimension, b.classified_as, e.vid_count, e.rov_score, e.str_score, e.ros_score FROM {element_base_table} b LEFT JOIN {element_effect_table} e ON b.element_id = e.element_id AND e.dt = :dt ORDER BY b.stable_id ASC, b.element_id ASC """ ) with SessionLocal() as session: category_rows = session.execute(category_sql, {"dt": partition_dt}).mappings().all() element_rows = session.execute(element_sql, {"dt": partition_dt}).mappings().all() categories: list[dict[str, object]] = [] child_category_ids: set[str] = set() category_rov_values: list[float] = [] for row in category_rows: category_id = str(row["category_id"] or "").strip() parent_id = str(row["parent_stable_id"] or "").strip() or None if parent_id: child_category_ids.add(parent_id) rov_score = row["rov_score"] if rov_score is not None and float(rov_score) != 0.0: category_rov_values.append(float(rov_score)) categories.append( { "category_id": category_id, "parent_stable_id": parent_id, "category_name": row["category_name"], "category_level": row["category_level"], "dimension": row["dimension"], "classified_as": row["classified_as"], "vid_count": row["vid_count"], "rov_score": float(rov_score) if rov_score is not None else None, "str_score": float(row["str_score"]) if row["str_score"] is not None else None, "ros_score": float(row["ros_score"]) if row["ros_score"] is not None else None, } ) for item in categories: item["is_leaf"] = item["category_id"] not in child_category_ids elements: list[dict[str, object]] = [] element_rov_values: list[float] = [] for row in element_rows: rov_score = row["rov_score"] if rov_score is not None and float(rov_score) != 0.0: element_rov_values.append(float(rov_score)) elements.append( { "element_id": str(row["element_id"] or "").strip(), "stable_id": str(row["stable_id"] or "").strip() or None, "element_name": row["element_name"], "dimension": row["dimension"], "classified_as": row["classified_as"], "vid_count": row["vid_count"], "rov_score": float(rov_score) if rov_score is not None else None, "str_score": float(row["str_score"]) if row["str_score"] is not None else None, "ros_score": float(row["ros_score"]) if row["ros_score"] is not None else None, } ) category_min_rov = min(category_rov_values) if category_rov_values else 0.0 category_max_rov = max(category_rov_values) if category_rov_values else 0.0 element_min_rov = min(element_rov_values) if element_rov_values else 0.0 element_max_rov = max(element_rov_values) if element_rov_values else 0.0 return { "dt": partition_dt, "available_dates": query_available_dates(), "category_min_rov_score": category_min_rov, "category_max_rov_score": category_max_rov, "element_min_rov_score": element_min_rov, "element_max_rov_score": element_max_rov, "categories": categories, "elements": elements, }