_mapper.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import json
  2. from typing import Any, Dict, List
  3. from app.core.database import DatabaseManager
  4. from ._const import DemandRecommendConst
  5. from ._utils import DemandRecord, MatchResult
  6. class DemandRecommendMapper:
  7. """需求-视频匹配数据层"""
  8. def __init__(self, pool: DatabaseManager):
  9. self.pool = pool
  10. self.table = DemandRecommendConst.MATCH_RESULT_TABLE
  11. self.db = DemandRecommendConst.MATCH_RESULT_DB
  12. # ── 需求表读取(暂为占位,待实际表结构确认后补全 SQL) ──
  13. async def fetch_demands_by_dt(self, dt: str) -> List[DemandRecord]:
  14. """读取指定日期的需求记录列表
  15. TODO: 替换为实际表名和字段映射
  16. """
  17. # query = """
  18. # SELECT * FROM demand_info
  19. # WHERE dt = %s
  20. # """
  21. # rows = await self.pool.async_fetch(query=query, params=(dt,), db_name=self.db)
  22. # return [DemandRecord.from_dict(r) for r in (rows or [])]
  23. return []
  24. # ── 匹配结果写入 ──
  25. async def save_match_results(self, results: List[MatchResult]) -> int:
  26. """批量写入匹配结果(IGNORE 已存在的 uk 冲突行)"""
  27. if not results:
  28. return 0
  29. query = f"""
  30. INSERT IGNORE INTO {self.table}
  31. (dt, demand_id, match_experiment_id, match_method, config_code,
  32. video_id, score, rank_position, video_title, video_detail)
  33. VALUES
  34. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  35. """
  36. params = [
  37. (
  38. r.dt,
  39. r.demand_id,
  40. r.match_experiment_id,
  41. r.match_method,
  42. r.config_code,
  43. r.video_id,
  44. r.score,
  45. r.rank_position,
  46. r.video_title,
  47. json.dumps(r.video_detail, ensure_ascii=False) if r.video_detail else None,
  48. )
  49. for r in results
  50. ]
  51. try:
  52. affected = await self.pool.async_save(
  53. query=query, params=params, db_name=self.db, batch=True
  54. )
  55. return affected
  56. except Exception:
  57. # async_save 内部已 rollback + log
  58. return 0
  59. # ── 匹配结果读取(供排序阶段使用) ──
  60. async def fetch_matches_by_demand(self, demand_id: str) -> List[Dict[str, Any]]:
  61. """按需求ID读取有效匹配结果"""
  62. query = f"""
  63. SELECT id, dt, demand_id, match_experiment_id, match_method, config_code,
  64. video_id, score, rank_position, video_title, video_detail
  65. FROM {self.table}
  66. WHERE demand_id = %s AND status = 1
  67. ORDER BY config_code, rank_position
  68. """
  69. rows = await self.pool.async_fetch(query=query, params=(demand_id,), db_name=self.db)
  70. return rows or []
  71. async def fetch_matches_by_experiment(
  72. self, experiment_id: str, limit: int = 1000
  73. ) -> List[Dict[str, Any]]:
  74. """按实验ID读取有效匹配结果"""
  75. query = f"""
  76. SELECT id, dt, demand_id, match_experiment_id, match_method, config_code,
  77. video_id, score, rank_position, video_title, video_detail
  78. FROM {self.table}
  79. WHERE match_experiment_id = %s AND status = 1
  80. ORDER BY demand_id, config_code, rank_position
  81. LIMIT %s
  82. """
  83. rows = await self.pool.async_fetch(
  84. query=query, params=(experiment_id, limit), db_name=self.db
  85. )
  86. return rows or []
  87. # ── 失效管理 ──
  88. async def invalidate_by_dt(self, dt: str) -> int:
  89. """将指定日期的匹配结果标记为失效"""
  90. query = f"""
  91. UPDATE {self.table}
  92. SET status = 0
  93. WHERE dt = %s AND status = 1
  94. """
  95. return await self.pool.async_save(query=query, params=(dt,), db_name=self.db) or 0
  96. async def invalidate_by_demand(self, demand_id: str) -> int:
  97. """将指定需求的匹配结果标记为失效"""
  98. query = f"""
  99. UPDATE {self.table}
  100. SET status = 0
  101. WHERE demand_id = %s AND status = 1
  102. """
  103. return await self.pool.async_save(query=query, params=(demand_id,), db_name=self.db) or 0