routes.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. from datetime import datetime
  2. from io import BytesIO
  3. from zoneinfo import ZoneInfo
  4. from fastapi import APIRouter, HTTPException, Query
  5. from fastapi.responses import StreamingResponse
  6. from app.core.config import settings
  7. from app.scheduler.manager import scheduler
  8. from app.services.demand_pool_service import (
  9. export_demand_pool_records,
  10. query_demand_pool_records,
  11. query_strategy_options,
  12. )
  13. from app.services.element_search_service import (
  14. query_monthly_element_demands,
  15. query_same_period_last_year_element_demands,
  16. query_same_period_last_year_lunar_element_demands,
  17. query_video_decode_url2_for_today,
  18. )
  19. from app.utils.excel_export import build_content_disposition, rows_to_excel_bytes
  20. router = APIRouter()
  21. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  22. DEMAND_POOL_EXPORT_COLUMNS: list[tuple[str, str]] = [
  23. ("ID", "id"),
  24. ("策略名", "strategy"),
  25. ("需求名称", "demand_name"),
  26. ("需求类型", "type"),
  27. ("权重", "weight"),
  28. ("视频数量", "video_count"),
  29. ("日期", "dt"),
  30. ]
  31. ELEMENT_DEMAND_EXPORT_COLUMNS: list[tuple[str, str]] = [
  32. ("策略", "strategy"),
  33. ("特征点名称", "demand_name"),
  34. ("权重", "weight"),
  35. ("视频数", "video_count"),
  36. ("视频列表", "video_list"),
  37. ]
  38. MONTHLY_ELEMENT_DEMAND_EXPORT_COLUMNS: list[tuple[str, str]] = [
  39. *ELEMENT_DEMAND_EXPORT_COLUMNS,
  40. ("月份列表", "month_list"),
  41. ("频次", "frequency"),
  42. ]
  43. def _export_timestamp() -> str:
  44. return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d_%H%M%S")
  45. def _excel_streaming_response(
  46. rows: list[dict[str, object]],
  47. columns: list[tuple[str, str]],
  48. *,
  49. sheet_name: str,
  50. filename: str,
  51. ) -> StreamingResponse:
  52. content = rows_to_excel_bytes(rows, columns, sheet_name=sheet_name)
  53. return StreamingResponse(
  54. BytesIO(content),
  55. media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  56. headers={"Content-Disposition": build_content_disposition(filename)},
  57. )
  58. @router.get("/health")
  59. async def health_check() -> dict[str, str]:
  60. return {"status": "ok", "service": settings.app_name, "env": settings.app_env}
  61. @router.get("/scheduler/status")
  62. async def scheduler_status() -> dict[str, object]:
  63. jobs = scheduler.get_jobs()
  64. return {
  65. "running": scheduler.running,
  66. "job_count": len(jobs),
  67. "jobs": [job.id for job in jobs],
  68. }
  69. @router.get("/demand-pool")
  70. async def query_demand_pool(
  71. strategy: list[str] | None = Query(default=None, description="策略,支持多选"),
  72. start_dt: str | None = Query(default=None, description="开始日期: yyyymmdd 或 yyyy-mm-dd"),
  73. end_dt: str | None = Query(default=None, description="结束日期: yyyymmdd 或 yyyy-mm-dd"),
  74. demand_name: str | None = Query(
  75. default=None,
  76. description="需求名称包含该子串则保留;空或未传则不筛选",
  77. ),
  78. min_weight: float | None = Query(default=None, description="最小权重"),
  79. max_weight: float | None = Query(default=None, description="最大权重"),
  80. sort_by: str | None = Query(default="weight", description="排序字段"),
  81. sort_order: str | None = Query(default="desc", description="排序方向: asc 或 desc"),
  82. page: int = Query(default=1, ge=1, description="页码,从 1 开始"),
  83. page_size: int = Query(default=20, ge=1, le=200, description="每页条数"),
  84. ) -> dict[str, object]:
  85. return query_demand_pool_records(
  86. strategies=strategy,
  87. start_dt=start_dt,
  88. end_dt=end_dt,
  89. demand_name=demand_name,
  90. min_weight=min_weight,
  91. max_weight=max_weight,
  92. sort_by=sort_by,
  93. sort_order=sort_order,
  94. page=page,
  95. page_size=page_size,
  96. )
  97. @router.get("/demand-pool/export")
  98. async def export_demand_pool(
  99. strategy: list[str] | None = Query(default=None, description="策略,支持多选"),
  100. start_dt: str | None = Query(default=None, description="开始日期: yyyymmdd 或 yyyy-mm-dd"),
  101. end_dt: str | None = Query(default=None, description="结束日期: yyyymmdd 或 yyyy-mm-dd"),
  102. demand_name: str | None = Query(
  103. default=None,
  104. description="需求名称包含该子串则保留;空或未传则不筛选",
  105. ),
  106. min_weight: float | None = Query(default=None, description="最小权重"),
  107. max_weight: float | None = Query(default=None, description="最大权重"),
  108. sort_by: str | None = Query(default="weight", description="排序字段"),
  109. sort_order: str | None = Query(default="desc", description="排序方向: asc 或 desc"),
  110. ) -> StreamingResponse:
  111. items = export_demand_pool_records(
  112. strategies=strategy,
  113. start_dt=start_dt,
  114. end_dt=end_dt,
  115. demand_name=demand_name,
  116. min_weight=min_weight,
  117. max_weight=max_weight,
  118. sort_by=sort_by,
  119. sort_order=sort_order,
  120. )
  121. filename = f"需求池_{_export_timestamp()}.xlsx"
  122. return _excel_streaming_response(
  123. items,
  124. DEMAND_POOL_EXPORT_COLUMNS,
  125. sheet_name="需求明细",
  126. filename=filename,
  127. )
  128. @router.get("/element-demands/solar-calendar")
  129. async def get_element_demands_solar_calendar(
  130. period_days: int = Query(
  131. ...,
  132. ge=0,
  133. description="区间天数(含去年阳历今日);0 表示仅当日分区",
  134. ),
  135. view_pv_count: int = Query(
  136. ...,
  137. ge=0,
  138. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  139. ),
  140. min_contribution_score: float = Query(
  141. ...,
  142. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  143. ),
  144. rov_avg: float = Query(
  145. ...,
  146. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  147. ),
  148. ) -> dict[str, object]:
  149. items = query_same_period_last_year_element_demands(
  150. period_days=period_days,
  151. view_pv_count=view_pv_count,
  152. min_contribution_score=min_contribution_score,
  153. rov_avg=rov_avg,
  154. )
  155. return {"items": items}
  156. @router.get("/element-demands/solar-calendar/export")
  157. async def export_element_demands_solar_calendar(
  158. period_days: int = Query(
  159. ...,
  160. ge=0,
  161. description="区间天数(含去年阳历今日);0 表示仅当日分区",
  162. ),
  163. view_pv_count: int = Query(
  164. ...,
  165. ge=0,
  166. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  167. ),
  168. min_contribution_score: float = Query(
  169. ...,
  170. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  171. ),
  172. rov_avg: float = Query(
  173. ...,
  174. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  175. ),
  176. ) -> StreamingResponse:
  177. items = query_same_period_last_year_element_demands(
  178. period_days=period_days,
  179. view_pv_count=view_pv_count,
  180. min_contribution_score=min_contribution_score,
  181. rov_avg=rov_avg,
  182. )
  183. filename = f"去年同期阳历特征点_{_export_timestamp()}.xlsx"
  184. return _excel_streaming_response(
  185. items,
  186. ELEMENT_DEMAND_EXPORT_COLUMNS,
  187. sheet_name="特征点明细",
  188. filename=filename,
  189. )
  190. @router.get("/element-demands/lunar-calendar")
  191. async def get_element_demands_lunar_calendar(
  192. period_days: int = Query(
  193. ...,
  194. ge=0,
  195. description="区间天数(含去年阴历今日);0 表示仅当日分区",
  196. ),
  197. view_pv_count: int = Query(
  198. ...,
  199. ge=0,
  200. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  201. ),
  202. min_contribution_score: float = Query(
  203. ...,
  204. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  205. ),
  206. rov_avg: float = Query(
  207. ...,
  208. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  209. ),
  210. ) -> dict[str, object]:
  211. items = query_same_period_last_year_lunar_element_demands(
  212. period_days=period_days,
  213. view_pv_count=view_pv_count,
  214. min_contribution_score=min_contribution_score,
  215. rov_avg=rov_avg,
  216. )
  217. return {"items": items}
  218. @router.get("/element-demands/lunar-calendar/export")
  219. async def export_element_demands_lunar_calendar(
  220. period_days: int = Query(
  221. ...,
  222. ge=0,
  223. description="区间天数(含去年阴历今日);0 表示仅当日分区",
  224. ),
  225. view_pv_count: int = Query(
  226. ...,
  227. ge=0,
  228. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  229. ),
  230. min_contribution_score: float = Query(
  231. ...,
  232. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  233. ),
  234. rov_avg: float = Query(
  235. ...,
  236. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  237. ),
  238. ) -> StreamingResponse:
  239. items = query_same_period_last_year_lunar_element_demands(
  240. period_days=period_days,
  241. view_pv_count=view_pv_count,
  242. min_contribution_score=min_contribution_score,
  243. rov_avg=rov_avg,
  244. )
  245. filename = f"去年同期阴历特征点_{_export_timestamp()}.xlsx"
  246. return _excel_streaming_response(
  247. items,
  248. ELEMENT_DEMAND_EXPORT_COLUMNS,
  249. sheet_name="特征点明细",
  250. filename=filename,
  251. )
  252. @router.get("/element-demands/monthly")
  253. async def get_element_demands_monthly(
  254. view_pv_count: int = Query(
  255. ...,
  256. ge=0,
  257. description="当日分发曝光 pv 下限(video_dimension_detail_add_column 单日行)",
  258. ),
  259. month_total_pv_threshold: float = Query(
  260. ...,
  261. ge=0,
  262. description="视频单月累计分发曝光 PV 和的下限(严格大于该值才保留)",
  263. ),
  264. min_contribution_score: float = Query(
  265. ...,
  266. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  267. ),
  268. rov_avg: float = Query(
  269. ...,
  270. description="元素单月平均 ROV 下限(按月聚合后再汇总)",
  271. ),
  272. min_frequency: int = Query(
  273. ...,
  274. ge=0,
  275. description="元素在回溯窗口内满足条件的月份数下限",
  276. ),
  277. ) -> dict[str, object]:
  278. items = query_monthly_element_demands(
  279. view_pv_count=view_pv_count,
  280. month_total_pv_threshold=month_total_pv_threshold,
  281. min_contribution_score=min_contribution_score,
  282. rov_avg=rov_avg,
  283. min_frequency=min_frequency,
  284. )
  285. return {"items": items}
  286. @router.get("/element-demands/monthly/export")
  287. async def export_element_demands_monthly(
  288. view_pv_count: int = Query(
  289. ...,
  290. ge=0,
  291. description="当日分发曝光 pv 下限(video_dimension_detail_add_column 单日行)",
  292. ),
  293. month_total_pv_threshold: float = Query(
  294. ...,
  295. ge=0,
  296. description="视频单月累计分发曝光 PV 和的下限(严格大于该值才保留)",
  297. ),
  298. min_contribution_score: float = Query(
  299. ...,
  300. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  301. ),
  302. rov_avg: float = Query(
  303. ...,
  304. description="元素单月平均 ROV 下限(按月聚合后再汇总)",
  305. ),
  306. min_frequency: int = Query(
  307. ...,
  308. ge=0,
  309. description="元素在回溯窗口内满足条件的月份数下限",
  310. ),
  311. ) -> StreamingResponse:
  312. items = query_monthly_element_demands(
  313. view_pv_count=view_pv_count,
  314. month_total_pv_threshold=month_total_pv_threshold,
  315. min_contribution_score=min_contribution_score,
  316. rov_avg=rov_avg,
  317. min_frequency=min_frequency,
  318. )
  319. filename = f"逐月特征点_{_export_timestamp()}.xlsx"
  320. return _excel_streaming_response(
  321. items,
  322. MONTHLY_ELEMENT_DEMAND_EXPORT_COLUMNS,
  323. sheet_name="特征点明细",
  324. filename=filename,
  325. )
  326. @router.get("/videos/decode-url")
  327. async def get_video_decode_page_url(
  328. vid: str = Query(
  329. ...,
  330. min_length=1,
  331. max_length=128,
  332. description="视频 id,对应 dwd_topic_decode_result_di.vid(当天上海分区)",
  333. ),
  334. ) -> dict[str, str | None]:
  335. try:
  336. url2 = query_video_decode_url2_for_today(vid)
  337. except ValueError as exc:
  338. raise HTTPException(status_code=400, detail=str(exc)) from exc
  339. return {"url2": url2}
  340. @router.get("/demand-pool/strategies")
  341. async def get_demand_pool_strategies(
  342. start_dt: str | None = Query(default=None, description="开始日期: yyyymmdd 或 yyyy-mm-dd"),
  343. end_dt: str | None = Query(default=None, description="结束日期: yyyymmdd 或 yyyy-mm-dd"),
  344. min_weight: float | None = Query(default=None, description="最小权重"),
  345. max_weight: float | None = Query(default=None, description="最大权重"),
  346. ) -> dict[str, object]:
  347. return query_strategy_options(
  348. start_dt=start_dt,
  349. end_dt=end_dt,
  350. min_weight=min_weight,
  351. max_weight=max_weight,
  352. )