| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- """新热事件需求导出明细:按热点记录创建时间筛选 hot_content_demand_exports。"""
- from __future__ import annotations
- import re
- from datetime import date, datetime, timedelta
- from typing import Any
- from zoneinfo import ZoneInfo
- from sqlalchemy import text
- from app.db.hot_content_mysql import HotContentSessionLocal
- SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
- DATE_RE = re.compile(r"^\d{8}$")
- ITEM_TYPE_WORD = "词"
- ITEM_TYPE_POINT = "点"
- MAX_EXPORT_ROWS = 50_000
- _EXPORT_SELECT = """
- SELECT
- e.id,
- e.source,
- e.hot_title,
- e.item_text,
- e.point_category,
- e.matched_demand,
- e.is_as_demand,
- e.contribution_score,
- e.wxindex_keyword,
- e.all_hot_keywords,
- e.wxindex_latest_score,
- e.wxindex_trend,
- e.event_sense_score,
- e.senior_fit_score,
- e.item_type,
- r.created_at AS record_created_at
- FROM hot_content_demand_exports e
- INNER JOIN hot_content_records r ON r.id = e.record_id
- """
- def _normalize_date(date_value: str | None) -> str | None:
- if not date_value:
- return None
- normalized = str(date_value).replace("-", "").strip()
- if not normalized:
- return None
- if not DATE_RE.match(normalized):
- raise ValueError("date must be yyyymmdd or yyyy-mm-dd")
- return normalized
- def _parse_yyyymmdd(value: str) -> date:
- return datetime.strptime(value, "%Y%m%d").date()
- def _resolve_date_range(
- start_dt: str | None,
- end_dt: str | None,
- ) -> tuple[datetime, datetime]:
- """返回 [start_at, end_at_exclusive) 上海时区半开区间。"""
- today = datetime.now(SHANGHAI_TZ).date()
- normalized_start = _normalize_date(start_dt)
- normalized_end = _normalize_date(end_dt)
- start_day = _parse_yyyymmdd(normalized_start) if normalized_start else today
- end_day = _parse_yyyymmdd(normalized_end) if normalized_end else today
- if start_day > end_day:
- raise ValueError("开始日期不能晚于结束日期")
- start_at = datetime.combine(start_day, datetime.min.time(), tzinfo=SHANGHAI_TZ)
- end_at_exclusive = datetime.combine(
- end_day + timedelta(days=1),
- datetime.min.time(),
- tzinfo=SHANGHAI_TZ,
- )
- return start_at, end_at_exclusive
- def _item_type_label(item_type: str | None) -> str:
- value = str(item_type or "").strip()
- if value == ITEM_TYPE_WORD:
- return "特征点"
- if value == ITEM_TYPE_POINT:
- return "短语"
- return value or "-"
- def _normalize_item_type(item_type: str | None) -> str | None:
- if item_type is None:
- return None
- value = str(item_type).strip()
- if not value or value in {"全部", "all"}:
- return None
- if value in {ITEM_TYPE_WORD, "特征点"}:
- return ITEM_TYPE_WORD
- if value in {ITEM_TYPE_POINT, "短语"}:
- return ITEM_TYPE_POINT
- raise ValueError("item_type 须为 词、点、特征点、短语,或留空表示全部")
- def _build_filters(
- *,
- start_dt: str | None,
- end_dt: str | None,
- is_as_demand: int | None,
- has_matched_demand: int | None,
- item_type: str | None,
- min_wxindex_latest_score: float | None,
- min_event_sense_score: float | None,
- min_senior_fit_score: float | None,
- ) -> tuple[str, dict[str, object], datetime, datetime]:
- start_at, end_at_exclusive = _resolve_date_range(start_dt, end_dt)
- where_parts = [
- "r.created_at >= :start_at",
- "r.created_at < :end_at_exclusive",
- ]
- params: dict[str, object] = {
- "start_at": start_at.replace(tzinfo=None),
- "end_at_exclusive": end_at_exclusive.replace(tzinfo=None),
- }
- if is_as_demand is not None:
- if is_as_demand not in (0, 1):
- raise ValueError("is_as_demand 须为 0 或 1")
- where_parts.append("e.is_as_demand = :is_as_demand")
- params["is_as_demand"] = is_as_demand
- if has_matched_demand is not None:
- if has_matched_demand not in (0, 1):
- raise ValueError("has_matched_demand 须为 0 或 1")
- if has_matched_demand == 1:
- where_parts.append(
- "e.matched_demand IS NOT NULL AND TRIM(e.matched_demand) <> ''"
- )
- else:
- where_parts.append(
- "(e.matched_demand IS NULL OR TRIM(e.matched_demand) = '')"
- )
- normalized_item_type = _normalize_item_type(item_type)
- if normalized_item_type:
- where_parts.append("e.item_type = :item_type")
- params["item_type"] = normalized_item_type
- if min_wxindex_latest_score is not None:
- if min_wxindex_latest_score < 0:
- raise ValueError("min_wxindex_latest_score 不能为负数")
- where_parts.append("e.wxindex_latest_score >= :min_wxindex_latest_score")
- params["min_wxindex_latest_score"] = min_wxindex_latest_score
- if min_event_sense_score is not None:
- if min_event_sense_score < 0:
- raise ValueError("min_event_sense_score 不能为负数")
- where_parts.append("e.event_sense_score >= :min_event_sense_score")
- params["min_event_sense_score"] = min_event_sense_score
- if min_senior_fit_score is not None:
- if min_senior_fit_score < 0:
- raise ValueError("min_senior_fit_score 不能为负数")
- where_parts.append("e.senior_fit_score >= :min_senior_fit_score")
- params["min_senior_fit_score"] = min_senior_fit_score
- where_sql = f"WHERE {' AND '.join(where_parts)}"
- return where_sql, params, start_at, end_at_exclusive
- def _row_to_dict(row: dict[str, Any]) -> dict[str, object]:
- created_at = row.get("record_created_at")
- if isinstance(created_at, datetime):
- record_created_at = created_at.strftime("%Y-%m-%d %H:%M:%S")
- else:
- record_created_at = str(created_at) if created_at is not None else ""
- is_as_demand_raw = row.get("is_as_demand")
- is_as_demand_int = int(is_as_demand_raw) if is_as_demand_raw is not None else 0
- contribution = row.get("contribution_score")
- return {
- "id": int(row["id"]),
- "source": str(row.get("source") or ""),
- "hot_title": str(row.get("hot_title") or ""),
- "item_text": str(row.get("item_text") or ""),
- "point_category": str(row.get("point_category") or ""),
- "item_type": str(row.get("item_type") or ""),
- "item_type_label": _item_type_label(row.get("item_type")),
- "matched_demand": str(row.get("matched_demand") or ""),
- "is_as_demand": is_as_demand_int,
- "is_as_demand_label": "是" if is_as_demand_int == 1 else "否",
- "contribution_score": float(contribution) if contribution is not None else None,
- "wxindex_keyword": str(row.get("wxindex_keyword") or ""),
- "all_hot_keywords": str(row.get("all_hot_keywords") or ""),
- "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
- "wxindex_trend": str(row.get("wxindex_trend") or ""),
- "event_sense_score": float(row["event_sense_score"])
- if row.get("event_sense_score") is not None
- else None,
- "senior_fit_score": float(row["senior_fit_score"])
- if row.get("senior_fit_score") is not None
- else None,
- "item_type": str(row.get("item_type") or ""),
- "record_created_at": record_created_at,
- }
- def query_hot_content_demand_exports(
- *,
- start_dt: str | None = None,
- end_dt: str | None = None,
- is_as_demand: int | None = None,
- has_matched_demand: int | None = None,
- item_type: str | None = None,
- min_wxindex_latest_score: float | None = None,
- min_event_sense_score: float | None = None,
- min_senior_fit_score: float | None = None,
- page: int = 1,
- page_size: int = 20,
- ) -> dict[str, object]:
- where_sql, params, _, _ = _build_filters(
- start_dt=start_dt,
- end_dt=end_dt,
- is_as_demand=is_as_demand,
- has_matched_demand=has_matched_demand,
- item_type=item_type,
- min_wxindex_latest_score=min_wxindex_latest_score,
- min_event_sense_score=min_event_sense_score,
- min_senior_fit_score=min_senior_fit_score,
- )
- offset = (page - 1) * page_size
- count_sql = text(
- f"""
- SELECT COUNT(*) AS cnt
- FROM hot_content_demand_exports e
- INNER JOIN hot_content_records r ON r.id = e.record_id
- {where_sql}
- """
- )
- list_sql = text(
- f"""
- {_EXPORT_SELECT}
- {where_sql}
- ORDER BY r.created_at ASC, e.id ASC
- LIMIT :limit OFFSET :offset
- """
- )
- list_params = {**params, "limit": page_size, "offset": offset}
- with HotContentSessionLocal() as session:
- total = int(session.execute(count_sql, params).scalar_one())
- rows = session.execute(list_sql, list_params).mappings().all()
- return {
- "total": total,
- "page": page,
- "page_size": page_size,
- "items": [_row_to_dict(dict(row)) for row in rows],
- }
- def export_hot_content_demand_exports(
- *,
- start_dt: str | None = None,
- end_dt: str | None = None,
- is_as_demand: int | None = None,
- has_matched_demand: int | None = None,
- item_type: str | None = None,
- min_wxindex_latest_score: float | None = None,
- min_event_sense_score: float | None = None,
- min_senior_fit_score: float | None = None,
- ) -> list[dict[str, object]]:
- where_sql, params, _, _ = _build_filters(
- start_dt=start_dt,
- end_dt=end_dt,
- is_as_demand=is_as_demand,
- has_matched_demand=has_matched_demand,
- item_type=item_type,
- min_wxindex_latest_score=min_wxindex_latest_score,
- min_event_sense_score=min_event_sense_score,
- min_senior_fit_score=min_senior_fit_score,
- )
- export_sql = text(
- f"""
- {_EXPORT_SELECT}
- {where_sql}
- ORDER BY r.created_at ASC, e.id ASC
- LIMIT :limit
- """
- )
- export_params = {**params, "limit": MAX_EXPORT_ROWS + 1}
- with HotContentSessionLocal() as session:
- rows = session.execute(export_sql, export_params).mappings().all()
- if len(rows) > MAX_EXPORT_ROWS:
- raise ValueError(f"导出条数超过上限 {MAX_EXPORT_ROWS},请缩小日期或筛选范围")
- return [_row_to_dict(dict(row)) for row in rows]
|