vertical_category_tree_service.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """垂直领域分类树:查询分类/元素基础信息与效果得分。"""
  2. import re
  3. from sqlalchemy import text
  4. from app.core.config import settings
  5. from app.db.mysql import SessionLocal
  6. IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
  7. DATE_RE = re.compile(r"^\d{8}$")
  8. def _safe_identifier(name: str) -> str:
  9. if not IDENTIFIER_RE.match(name):
  10. raise ValueError(f"invalid sql identifier: {name}")
  11. return name
  12. def _normalize_date(date_value: str | None) -> str | None:
  13. if not date_value:
  14. return None
  15. normalized = date_value.replace("-", "").strip()
  16. if not normalized:
  17. return None
  18. if not DATE_RE.match(normalized):
  19. raise ValueError("date must be yyyymmdd or yyyy-mm-dd")
  20. return normalized
  21. def _resolve_partition_dt(dt: str | None) -> str:
  22. normalized = _normalize_date(dt)
  23. if normalized:
  24. return normalized
  25. category_effect_table = _safe_identifier(settings.vertical_category_effect_table)
  26. with SessionLocal() as session:
  27. row = session.execute(
  28. text(f"SELECT MAX(dt) FROM {category_effect_table}")
  29. ).first()
  30. latest = str(row[0] or "").strip() if row else ""
  31. if not latest:
  32. raise ValueError("暂无效果数据分区,请先同步垂直领域分类数据")
  33. return latest
  34. def query_available_dates() -> list[str]:
  35. category_effect_table = _safe_identifier(settings.vertical_category_effect_table)
  36. with SessionLocal() as session:
  37. rows = session.execute(
  38. text(
  39. f"""
  40. SELECT DISTINCT dt
  41. FROM {category_effect_table}
  42. ORDER BY dt DESC
  43. LIMIT 366
  44. """
  45. )
  46. ).all()
  47. return [str(row[0]) for row in rows if row[0]]
  48. def query_vertical_category_tree(dt: str | None = None) -> dict[str, object]:
  49. partition_dt = _resolve_partition_dt(dt)
  50. category_base_table = _safe_identifier(settings.vertical_category_base_table)
  51. category_effect_table = _safe_identifier(settings.vertical_category_effect_table)
  52. element_base_table = _safe_identifier(settings.substance_element_base_table)
  53. element_effect_table = _safe_identifier(settings.substance_element_effect_table)
  54. category_sql = text(
  55. f"""
  56. SELECT
  57. b.category_id,
  58. b.parent_stable_id,
  59. b.category_name,
  60. b.category_level,
  61. b.dimension,
  62. b.classified_as,
  63. e.vid_count,
  64. e.rov_score,
  65. e.str_score,
  66. e.ros_score
  67. FROM {category_base_table} b
  68. LEFT JOIN {category_effect_table} e
  69. ON b.category_id = e.category_id
  70. AND e.dt = :dt
  71. ORDER BY b.category_level ASC, b.category_id ASC
  72. """
  73. )
  74. element_sql = text(
  75. f"""
  76. SELECT
  77. b.element_id,
  78. b.stable_id,
  79. b.element_name,
  80. b.dimension,
  81. b.classified_as,
  82. e.vid_count,
  83. e.rov_score,
  84. e.str_score,
  85. e.ros_score
  86. FROM {element_base_table} b
  87. LEFT JOIN {element_effect_table} e
  88. ON b.element_id = e.element_id
  89. AND e.dt = :dt
  90. ORDER BY b.stable_id ASC, b.element_id ASC
  91. """
  92. )
  93. with SessionLocal() as session:
  94. category_rows = session.execute(category_sql, {"dt": partition_dt}).mappings().all()
  95. element_rows = session.execute(element_sql, {"dt": partition_dt}).mappings().all()
  96. categories: list[dict[str, object]] = []
  97. child_category_ids: set[str] = set()
  98. category_rov_values: list[float] = []
  99. for row in category_rows:
  100. category_id = str(row["category_id"] or "").strip()
  101. parent_id = str(row["parent_stable_id"] or "").strip() or None
  102. if parent_id:
  103. child_category_ids.add(parent_id)
  104. rov_score = row["rov_score"]
  105. if rov_score is not None and float(rov_score) != 0.0:
  106. category_rov_values.append(float(rov_score))
  107. categories.append(
  108. {
  109. "category_id": category_id,
  110. "parent_stable_id": parent_id,
  111. "category_name": row["category_name"],
  112. "category_level": row["category_level"],
  113. "dimension": row["dimension"],
  114. "classified_as": row["classified_as"],
  115. "vid_count": row["vid_count"],
  116. "rov_score": float(rov_score) if rov_score is not None else None,
  117. "str_score": float(row["str_score"]) if row["str_score"] is not None else None,
  118. "ros_score": float(row["ros_score"]) if row["ros_score"] is not None else None,
  119. }
  120. )
  121. for item in categories:
  122. item["is_leaf"] = item["category_id"] not in child_category_ids
  123. elements: list[dict[str, object]] = []
  124. element_rov_values: list[float] = []
  125. for row in element_rows:
  126. rov_score = row["rov_score"]
  127. if rov_score is not None and float(rov_score) != 0.0:
  128. element_rov_values.append(float(rov_score))
  129. elements.append(
  130. {
  131. "element_id": str(row["element_id"] or "").strip(),
  132. "stable_id": str(row["stable_id"] or "").strip() or None,
  133. "element_name": row["element_name"],
  134. "dimension": row["dimension"],
  135. "classified_as": row["classified_as"],
  136. "vid_count": row["vid_count"],
  137. "rov_score": float(rov_score) if rov_score is not None else None,
  138. "str_score": float(row["str_score"]) if row["str_score"] is not None else None,
  139. "ros_score": float(row["ros_score"]) if row["ros_score"] is not None else None,
  140. }
  141. )
  142. category_min_rov = min(category_rov_values) if category_rov_values else 0.0
  143. category_max_rov = max(category_rov_values) if category_rov_values else 0.0
  144. element_min_rov = min(element_rov_values) if element_rov_values else 0.0
  145. element_max_rov = max(element_rov_values) if element_rov_values else 0.0
  146. return {
  147. "dt": partition_dt,
  148. "available_dates": query_available_dates(),
  149. "category_min_rov_score": category_min_rov,
  150. "category_max_rov_score": category_max_rov,
  151. "element_min_rov_score": element_min_rov,
  152. "element_max_rov_score": element_max_rov,
  153. "categories": categories,
  154. "elements": elements,
  155. }