routes.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. from datetime import datetime
  2. from io import BytesIO
  3. from zoneinfo import ZoneInfo
  4. from fastapi import APIRouter, HTTPException, Query
  5. from fastapi.responses import StreamingResponse
  6. from app.core.config import settings
  7. from app.scheduler.manager import scheduler
  8. from app.services.demand_pool_service import (
  9. export_demand_pool_records,
  10. query_demand_pool_records,
  11. query_strategy_options,
  12. )
  13. from app.services.hot_content_source_service import fetch_hot_content_source_detail
  14. from app.services.hot_content_demand_export_service import (
  15. export_hot_content_demand_exports,
  16. query_hot_content_demand_exports,
  17. )
  18. from app.services.element_search_service import (
  19. query_monthly_element_demands,
  20. query_same_period_last_year_element_demands,
  21. query_same_period_last_year_lunar_element_demands,
  22. query_video_decode_url2_for_today,
  23. )
  24. from app.utils.excel_export import build_content_disposition, rows_to_excel_bytes
  25. router = APIRouter()
  26. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  27. DEMAND_POOL_EXPORT_COLUMNS: list[tuple[str, str]] = [
  28. ("ID", "id"),
  29. ("策略名", "strategy"),
  30. ("需求名称", "demand_name"),
  31. ("需求类型", "type"),
  32. ("权重", "weight"),
  33. ("视频数量", "video_count"),
  34. ("日期", "dt"),
  35. ]
  36. ELEMENT_DEMAND_EXPORT_COLUMNS: list[tuple[str, str]] = [
  37. ("策略", "strategy"),
  38. ("特征点名称", "demand_name"),
  39. ("权重", "weight"),
  40. ("视频数", "video_count"),
  41. ("视频列表", "video_list"),
  42. ]
  43. MONTHLY_ELEMENT_DEMAND_EXPORT_COLUMNS: list[tuple[str, str]] = [
  44. *ELEMENT_DEMAND_EXPORT_COLUMNS,
  45. ("月份列表", "month_list"),
  46. ("频次", "frequency"),
  47. ]
  48. HOT_CONTENT_DEMAND_EXPORT_COLUMNS: list[tuple[str, str]] = [
  49. ("来源", "source"),
  50. ("热点标题", "hot_title"),
  51. ("类型", "point_category"),
  52. ("需求类型", "item_type_label"),
  53. ("匹配需求", "matched_demand"),
  54. ("是否成为需求", "is_as_demand_label"),
  55. ("创建时间", "record_created_at"),
  56. ("贡献分", "contribution_score"),
  57. ("最高微信指数词", "wxindex_keyword"),
  58. ("待选微信指数词", "all_hot_keywords"),
  59. ("微信指数热度", "wxindex_latest_score"),
  60. ("微信指数趋势", "wxindex_trend"),
  61. ]
  62. def _export_timestamp() -> str:
  63. return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d_%H%M%S")
  64. def _excel_streaming_response(
  65. rows: list[dict[str, object]],
  66. columns: list[tuple[str, str]],
  67. *,
  68. sheet_name: str,
  69. filename: str,
  70. ) -> StreamingResponse:
  71. content = rows_to_excel_bytes(rows, columns, sheet_name=sheet_name)
  72. return StreamingResponse(
  73. BytesIO(content),
  74. media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  75. headers={"Content-Disposition": build_content_disposition(filename)},
  76. )
  77. @router.get("/health")
  78. async def health_check() -> dict[str, str]:
  79. return {"status": "ok", "service": settings.app_name, "env": settings.app_env}
  80. @router.get("/scheduler/status")
  81. async def scheduler_status() -> dict[str, object]:
  82. jobs = scheduler.get_jobs()
  83. return {
  84. "running": scheduler.running,
  85. "job_count": len(jobs),
  86. "jobs": [job.id for job in jobs],
  87. }
  88. @router.get("/demand-pool")
  89. async def query_demand_pool(
  90. strategy: list[str] | None = Query(default=None, description="策略,支持多选"),
  91. start_dt: str | None = Query(default=None, description="开始日期: yyyymmdd 或 yyyy-mm-dd"),
  92. end_dt: str | None = Query(default=None, description="结束日期: yyyymmdd 或 yyyy-mm-dd"),
  93. demand_name: str | None = Query(
  94. default=None,
  95. description="需求名称包含该子串则保留;空或未传则不筛选",
  96. ),
  97. min_weight: float | None = Query(default=None, description="最小权重"),
  98. max_weight: float | None = Query(default=None, description="最大权重"),
  99. sort_by: str | None = Query(default="weight", description="排序字段"),
  100. sort_order: str | None = Query(default="desc", description="排序方向: asc 或 desc"),
  101. page: int = Query(default=1, ge=1, description="页码,从 1 开始"),
  102. page_size: int = Query(default=20, ge=1, le=200, description="每页条数"),
  103. ) -> dict[str, object]:
  104. return query_demand_pool_records(
  105. strategies=strategy,
  106. start_dt=start_dt,
  107. end_dt=end_dt,
  108. demand_name=demand_name,
  109. min_weight=min_weight,
  110. max_weight=max_weight,
  111. sort_by=sort_by,
  112. sort_order=sort_order,
  113. page=page,
  114. page_size=page_size,
  115. )
  116. @router.get("/demand-pool/export")
  117. async def export_demand_pool(
  118. strategy: list[str] | None = Query(default=None, description="策略,支持多选"),
  119. start_dt: str | None = Query(default=None, description="开始日期: yyyymmdd 或 yyyy-mm-dd"),
  120. end_dt: str | None = Query(default=None, description="结束日期: yyyymmdd 或 yyyy-mm-dd"),
  121. demand_name: str | None = Query(
  122. default=None,
  123. description="需求名称包含该子串则保留;空或未传则不筛选",
  124. ),
  125. min_weight: float | None = Query(default=None, description="最小权重"),
  126. max_weight: float | None = Query(default=None, description="最大权重"),
  127. sort_by: str | None = Query(default="weight", description="排序字段"),
  128. sort_order: str | None = Query(default="desc", description="排序方向: asc 或 desc"),
  129. ) -> StreamingResponse:
  130. items = export_demand_pool_records(
  131. strategies=strategy,
  132. start_dt=start_dt,
  133. end_dt=end_dt,
  134. demand_name=demand_name,
  135. min_weight=min_weight,
  136. max_weight=max_weight,
  137. sort_by=sort_by,
  138. sort_order=sort_order,
  139. )
  140. filename = f"需求池_{_export_timestamp()}.xlsx"
  141. return _excel_streaming_response(
  142. items,
  143. DEMAND_POOL_EXPORT_COLUMNS,
  144. sheet_name="需求明细",
  145. filename=filename,
  146. )
  147. @router.get("/element-demands/solar-calendar")
  148. async def get_element_demands_solar_calendar(
  149. period_days: int = Query(
  150. ...,
  151. ge=0,
  152. description="区间天数(含去年阳历今日);0 表示仅当日分区",
  153. ),
  154. view_pv_count: int = Query(
  155. ...,
  156. ge=0,
  157. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  158. ),
  159. min_contribution_score: float = Query(
  160. ...,
  161. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  162. ),
  163. rov_avg: float = Query(
  164. ...,
  165. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  166. ),
  167. ) -> dict[str, object]:
  168. items = query_same_period_last_year_element_demands(
  169. period_days=period_days,
  170. view_pv_count=view_pv_count,
  171. min_contribution_score=min_contribution_score,
  172. rov_avg=rov_avg,
  173. )
  174. return {"items": items}
  175. @router.get("/element-demands/solar-calendar/export")
  176. async def export_element_demands_solar_calendar(
  177. period_days: int = Query(
  178. ...,
  179. ge=0,
  180. description="区间天数(含去年阳历今日);0 表示仅当日分区",
  181. ),
  182. view_pv_count: int = Query(
  183. ...,
  184. ge=0,
  185. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  186. ),
  187. min_contribution_score: float = Query(
  188. ...,
  189. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  190. ),
  191. rov_avg: float = Query(
  192. ...,
  193. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  194. ),
  195. ) -> StreamingResponse:
  196. items = query_same_period_last_year_element_demands(
  197. period_days=period_days,
  198. view_pv_count=view_pv_count,
  199. min_contribution_score=min_contribution_score,
  200. rov_avg=rov_avg,
  201. )
  202. filename = f"去年同期阳历特征点_{_export_timestamp()}.xlsx"
  203. return _excel_streaming_response(
  204. items,
  205. ELEMENT_DEMAND_EXPORT_COLUMNS,
  206. sheet_name="特征点明细",
  207. filename=filename,
  208. )
  209. @router.get("/element-demands/lunar-calendar")
  210. async def get_element_demands_lunar_calendar(
  211. period_days: int = Query(
  212. ...,
  213. ge=0,
  214. description="区间天数(含去年阴历今日);0 表示仅当日分区",
  215. ),
  216. view_pv_count: int = Query(
  217. ...,
  218. ge=0,
  219. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  220. ),
  221. min_contribution_score: float = Query(
  222. ...,
  223. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  224. ),
  225. rov_avg: float = Query(
  226. ...,
  227. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  228. ),
  229. ) -> dict[str, object]:
  230. items = query_same_period_last_year_lunar_element_demands(
  231. period_days=period_days,
  232. view_pv_count=view_pv_count,
  233. min_contribution_score=min_contribution_score,
  234. rov_avg=rov_avg,
  235. )
  236. return {"items": items}
  237. @router.get("/element-demands/lunar-calendar/export")
  238. async def export_element_demands_lunar_calendar(
  239. period_days: int = Query(
  240. ...,
  241. ge=0,
  242. description="区间天数(含去年阴历今日);0 表示仅当日分区",
  243. ),
  244. view_pv_count: int = Query(
  245. ...,
  246. ge=0,
  247. description="当日分发曝光 pv 下限(video_dimension_detail_add_column)",
  248. ),
  249. min_contribution_score: float = Query(
  250. ...,
  251. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  252. ),
  253. rov_avg: float = Query(
  254. ...,
  255. description="按原始元素分组后的平均 ROV 下限(HAVING)",
  256. ),
  257. ) -> StreamingResponse:
  258. items = query_same_period_last_year_lunar_element_demands(
  259. period_days=period_days,
  260. view_pv_count=view_pv_count,
  261. min_contribution_score=min_contribution_score,
  262. rov_avg=rov_avg,
  263. )
  264. filename = f"去年同期阴历特征点_{_export_timestamp()}.xlsx"
  265. return _excel_streaming_response(
  266. items,
  267. ELEMENT_DEMAND_EXPORT_COLUMNS,
  268. sheet_name="特征点明细",
  269. filename=filename,
  270. )
  271. @router.get("/element-demands/monthly")
  272. async def get_element_demands_monthly(
  273. view_pv_count: int = Query(
  274. ...,
  275. ge=0,
  276. description="当日分发曝光 pv 下限(video_dimension_detail_add_column 单日行)",
  277. ),
  278. month_total_pv_threshold: float = Query(
  279. ...,
  280. ge=0,
  281. description="视频单月累计分发曝光 PV 和的下限(严格大于该值才保留)",
  282. ),
  283. min_contribution_score: float = Query(
  284. ...,
  285. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  286. ),
  287. rov_avg: float = Query(
  288. ...,
  289. description="元素单月平均 ROV 下限(按月聚合后再汇总)",
  290. ),
  291. min_frequency: int = Query(
  292. ...,
  293. ge=0,
  294. description="元素在回溯窗口内满足条件的月份数下限",
  295. ),
  296. ) -> dict[str, object]:
  297. items = query_monthly_element_demands(
  298. view_pv_count=view_pv_count,
  299. month_total_pv_threshold=month_total_pv_threshold,
  300. min_contribution_score=min_contribution_score,
  301. rov_avg=rov_avg,
  302. min_frequency=min_frequency,
  303. )
  304. return {"items": items}
  305. @router.get("/element-demands/monthly/export")
  306. async def export_element_demands_monthly(
  307. view_pv_count: int = Query(
  308. ...,
  309. ge=0,
  310. description="当日分发曝光 pv 下限(video_dimension_detail_add_column 单日行)",
  311. ),
  312. month_total_pv_threshold: float = Query(
  313. ...,
  314. ge=0,
  315. description="视频单月累计分发曝光 PV 和的下限(严格大于该值才保留)",
  316. ),
  317. min_contribution_score: float = Query(
  318. ...,
  319. description="贡献分下限(dwd_topic_decode_result_detail_di)",
  320. ),
  321. rov_avg: float = Query(
  322. ...,
  323. description="元素单月平均 ROV 下限(按月聚合后再汇总)",
  324. ),
  325. min_frequency: int = Query(
  326. ...,
  327. ge=0,
  328. description="元素在回溯窗口内满足条件的月份数下限",
  329. ),
  330. ) -> StreamingResponse:
  331. items = query_monthly_element_demands(
  332. view_pv_count=view_pv_count,
  333. month_total_pv_threshold=month_total_pv_threshold,
  334. min_contribution_score=min_contribution_score,
  335. rov_avg=rov_avg,
  336. min_frequency=min_frequency,
  337. )
  338. filename = f"逐月特征点_{_export_timestamp()}.xlsx"
  339. return _excel_streaming_response(
  340. items,
  341. MONTHLY_ELEMENT_DEMAND_EXPORT_COLUMNS,
  342. sheet_name="特征点明细",
  343. filename=filename,
  344. )
  345. @router.get("/videos/decode-url")
  346. async def get_video_decode_page_url(
  347. vid: str = Query(
  348. ...,
  349. min_length=1,
  350. max_length=128,
  351. description="视频 id,对应 dwd_topic_decode_result_di.vid(当天上海分区)",
  352. ),
  353. ) -> dict[str, str | None]:
  354. try:
  355. url2 = query_video_decode_url2_for_today(vid)
  356. except ValueError as exc:
  357. raise HTTPException(status_code=400, detail=str(exc)) from exc
  358. return {"url2": url2}
  359. @router.get("/demand-pool/hot-content-source")
  360. async def get_demand_pool_hot_content_source(
  361. demand_name: str = Query(..., min_length=1, description="需求名称"),
  362. demand_type: str = Query(..., min_length=1, description="需求类型,如 短语 / 特征点"),
  363. dt: str = Query(
  364. ...,
  365. min_length=1,
  366. description="分区日期,yyyymmdd 或 yyyy-mm-dd",
  367. ),
  368. strategy: str | None = Query(default=None, description="策略名,默认新热事件"),
  369. ) -> dict[str, object]:
  370. try:
  371. return fetch_hot_content_source_detail(
  372. demand_name=demand_name,
  373. demand_type=demand_type,
  374. partition_dt=dt,
  375. strategy=strategy,
  376. )
  377. except RuntimeError as exc:
  378. raise HTTPException(status_code=503, detail=str(exc)) from exc
  379. except ValueError as exc:
  380. raise HTTPException(status_code=400, detail=str(exc)) from exc
  381. except LookupError as exc:
  382. raise HTTPException(status_code=404, detail=str(exc)) from exc
  383. @router.get("/hot-content/demand-exports")
  384. async def get_hot_content_demand_exports(
  385. start_dt: str | None = Query(
  386. default=None,
  387. description="开始日期(热点记录创建时间): yyyymmdd 或 yyyy-mm-dd,默认当天",
  388. ),
  389. end_dt: str | None = Query(
  390. default=None,
  391. description="结束日期(热点记录创建时间): yyyymmdd 或 yyyy-mm-dd,默认当天",
  392. ),
  393. is_as_demand: int | None = Query(
  394. default=None,
  395. description="是否成为需求:0 否,1 是;不传表示全部",
  396. ),
  397. item_type: str | None = Query(
  398. default=None,
  399. description="需求类型:词(特征点)、点(短语);不传表示全部",
  400. ),
  401. min_wxindex_latest_score: float | None = Query(
  402. default=None,
  403. ge=0,
  404. description="微信指数热度下限(wxindex_latest_score >= 该值);不传表示不限制",
  405. ),
  406. page: int = Query(default=1, ge=1, description="页码,从 1 开始"),
  407. page_size: int = Query(default=20, ge=1, le=200, description="每页条数"),
  408. ) -> dict[str, object]:
  409. try:
  410. return query_hot_content_demand_exports(
  411. start_dt=start_dt,
  412. end_dt=end_dt,
  413. is_as_demand=is_as_demand,
  414. item_type=item_type,
  415. min_wxindex_latest_score=min_wxindex_latest_score,
  416. page=page,
  417. page_size=page_size,
  418. )
  419. except RuntimeError as exc:
  420. raise HTTPException(status_code=503, detail=str(exc)) from exc
  421. except ValueError as exc:
  422. raise HTTPException(status_code=400, detail=str(exc)) from exc
  423. @router.get("/hot-content/demand-exports/export")
  424. async def export_hot_content_demand_exports_api(
  425. start_dt: str | None = Query(default=None, description="开始日期"),
  426. end_dt: str | None = Query(default=None, description="结束日期"),
  427. is_as_demand: int | None = Query(default=None, description="是否成为需求:0/1"),
  428. item_type: str | None = Query(default=None, description="需求类型:词/点"),
  429. min_wxindex_latest_score: float | None = Query(
  430. default=None,
  431. ge=0,
  432. description="微信指数热度下限",
  433. ),
  434. ) -> StreamingResponse:
  435. try:
  436. items = export_hot_content_demand_exports(
  437. start_dt=start_dt,
  438. end_dt=end_dt,
  439. is_as_demand=is_as_demand,
  440. item_type=item_type,
  441. min_wxindex_latest_score=min_wxindex_latest_score,
  442. )
  443. except RuntimeError as exc:
  444. raise HTTPException(status_code=503, detail=str(exc)) from exc
  445. except ValueError as exc:
  446. raise HTTPException(status_code=400, detail=str(exc)) from exc
  447. filename = f"新热事件查询_{_export_timestamp()}.xlsx"
  448. return _excel_streaming_response(
  449. items,
  450. HOT_CONTENT_DEMAND_EXPORT_COLUMNS,
  451. sheet_name="新热事件",
  452. filename=filename,
  453. )
  454. @router.get("/demand-pool/strategies")
  455. async def get_demand_pool_strategies(
  456. start_dt: str | None = Query(default=None, description="开始日期: yyyymmdd 或 yyyy-mm-dd"),
  457. end_dt: str | None = Query(default=None, description="结束日期: yyyymmdd 或 yyyy-mm-dd"),
  458. min_weight: float | None = Query(default=None, description="最小权重"),
  459. max_weight: float | None = Query(default=None, description="最大权重"),
  460. ) -> dict[str, object]:
  461. return query_strategy_options(
  462. start_dt=start_dt,
  463. end_dt=end_dt,
  464. min_weight=min_weight,
  465. max_weight=max_weight,
  466. )