| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import json
- from typing import Any, Dict, List
- from app.core.database import DatabaseManager
- from ._const import DemandRecommendConst
- from ._utils import DemandRecord, MatchResult
- class DemandRecommendMapper:
- """需求-视频匹配数据层"""
- def __init__(self, pool: DatabaseManager):
- self.pool = pool
- self.table = DemandRecommendConst.MATCH_RESULT_TABLE
- self.db = DemandRecommendConst.MATCH_RESULT_DB
- # ── 需求表读取(暂为占位,待实际表结构确认后补全 SQL) ──
- async def fetch_demands_by_dt(self, dt: str) -> List[DemandRecord]:
- """读取指定日期的需求记录列表
- TODO: 替换为实际表名和字段映射
- """
- # query = """
- # SELECT * FROM demand_info
- # WHERE dt = %s
- # """
- # rows = await self.pool.async_fetch(query=query, params=(dt,), db_name=self.db)
- # return [DemandRecord.from_dict(r) for r in (rows or [])]
- return []
- # ── 匹配结果写入 ──
- async def save_match_results(self, results: List[MatchResult]) -> int:
- """批量写入匹配结果(IGNORE 已存在的 uk 冲突行)"""
- if not results:
- return 0
- query = f"""
- INSERT IGNORE INTO {self.table}
- (dt, demand_id, match_experiment_id, match_method, config_code,
- video_id, score, rank_position, video_title, video_detail)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- params = [
- (
- r.dt,
- r.demand_id,
- r.match_experiment_id,
- r.match_method,
- r.config_code,
- r.video_id,
- r.score,
- r.rank_position,
- r.video_title,
- json.dumps(r.video_detail, ensure_ascii=False) if r.video_detail else None,
- )
- for r in results
- ]
- try:
- affected = await self.pool.async_save(
- query=query, params=params, db_name=self.db, batch=True
- )
- return affected
- except Exception:
- # async_save 内部已 rollback + log
- return 0
- # ── 匹配结果读取(供排序阶段使用) ──
- async def fetch_matches_by_demand(self, demand_id: str) -> List[Dict[str, Any]]:
- """按需求ID读取有效匹配结果"""
- query = f"""
- SELECT id, dt, demand_id, match_experiment_id, match_method, config_code,
- video_id, score, rank_position, video_title, video_detail
- FROM {self.table}
- WHERE demand_id = %s AND status = 1
- ORDER BY config_code, rank_position
- """
- rows = await self.pool.async_fetch(query=query, params=(demand_id,), db_name=self.db)
- return rows or []
- async def fetch_matches_by_experiment(
- self, experiment_id: str, limit: int = 1000
- ) -> List[Dict[str, Any]]:
- """按实验ID读取有效匹配结果"""
- query = f"""
- SELECT id, dt, demand_id, match_experiment_id, match_method, config_code,
- video_id, score, rank_position, video_title, video_detail
- FROM {self.table}
- WHERE match_experiment_id = %s AND status = 1
- ORDER BY demand_id, config_code, rank_position
- LIMIT %s
- """
- rows = await self.pool.async_fetch(
- query=query, params=(experiment_id, limit), db_name=self.db
- )
- return rows or []
- # ── 失效管理 ──
- async def invalidate_by_dt(self, dt: str) -> int:
- """将指定日期的匹配结果标记为失效"""
- query = f"""
- UPDATE {self.table}
- SET status = 0
- WHERE dt = %s AND status = 1
- """
- return await self.pool.async_save(query=query, params=(dt,), db_name=self.db) or 0
- async def invalidate_by_demand(self, demand_id: str) -> int:
- """将指定需求的匹配结果标记为失效"""
- query = f"""
- UPDATE {self.table}
- SET status = 0
- WHERE demand_id = %s AND status = 1
- """
- return await self.pool.async_save(query=query, params=(demand_id,), db_name=self.db) or 0
|