test_experiment_demand_pool_write.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. """实验需求池写入逻辑单元测试。"""
  2. from app.strategies.staging_store import StagingRow
  3. from app.sync.experiment_demand_pool_write import (
  4. ExperimentStrategyContext,
  5. _staging_row_to_odps_record,
  6. select_rows_to_write,
  7. )
  8. def _row(
  9. *,
  10. strategy_id: str,
  11. strategy_name: str,
  12. demand_id: str,
  13. demand_name: str,
  14. weight: float,
  15. ) -> StagingRow:
  16. return StagingRow(
  17. strategy_config_id=strategy_id,
  18. strategy=strategy_name,
  19. demand_id=demand_id,
  20. demand_name=demand_name,
  21. weight=weight,
  22. demand_type="特征点",
  23. video_count=1,
  24. video_list=None,
  25. extend=None,
  26. batch_date="20260611",
  27. )
  28. def test_select_rows_respects_daily_limit() -> None:
  29. contexts = [
  30. ExperimentStrategyContext(
  31. strategy_id="monthly",
  32. strategy_name="逐月",
  33. priority=10,
  34. daily_limit=1,
  35. current_count=0,
  36. staging_rows=[
  37. _row(
  38. strategy_id="monthly",
  39. strategy_name="逐月",
  40. demand_id="a1",
  41. demand_name="需求A",
  42. weight=2.0,
  43. ),
  44. _row(
  45. strategy_id="monthly",
  46. strategy_name="逐月",
  47. demand_id="a2",
  48. demand_name="需求B",
  49. weight=1.0,
  50. ),
  51. ],
  52. )
  53. ]
  54. selected, counts = select_rows_to_write(
  55. strategies=contexts,
  56. existing_demand_ids=set(),
  57. claimed_names={},
  58. )
  59. assert len(selected) == 1
  60. assert selected[0].demand_id == "a1"
  61. assert counts["逐月"] == 1
  62. def test_select_rows_same_priority_no_name_dedup() -> None:
  63. contexts = [
  64. ExperimentStrategyContext(
  65. strategy_id="s1",
  66. strategy_name="策略1",
  67. priority=10,
  68. daily_limit=0,
  69. current_count=0,
  70. staging_rows=[
  71. _row(
  72. strategy_id="s1",
  73. strategy_name="策略1",
  74. demand_id="id1",
  75. demand_name="同名需求",
  76. weight=1.0,
  77. )
  78. ],
  79. ),
  80. ExperimentStrategyContext(
  81. strategy_id="s2",
  82. strategy_name="策略2",
  83. priority=10,
  84. daily_limit=0,
  85. current_count=0,
  86. staging_rows=[
  87. _row(
  88. strategy_id="s2",
  89. strategy_name="策略2",
  90. demand_id="id2",
  91. demand_name="同名需求",
  92. weight=1.0,
  93. )
  94. ],
  95. ),
  96. ]
  97. selected, _ = select_rows_to_write(
  98. strategies=contexts,
  99. existing_demand_ids=set(),
  100. claimed_names={},
  101. )
  102. assert len(selected) == 2
  103. def test_select_rows_different_priority_name_dedup() -> None:
  104. contexts = [
  105. ExperimentStrategyContext(
  106. strategy_id="high",
  107. strategy_name="高优",
  108. priority=5,
  109. daily_limit=0,
  110. current_count=0,
  111. staging_rows=[
  112. _row(
  113. strategy_id="high",
  114. strategy_name="高优",
  115. demand_id="id1",
  116. demand_name="同名需求",
  117. weight=1.0,
  118. )
  119. ],
  120. ),
  121. ExperimentStrategyContext(
  122. strategy_id="low",
  123. strategy_name="低优",
  124. priority=20,
  125. daily_limit=0,
  126. current_count=0,
  127. staging_rows=[
  128. _row(
  129. strategy_id="low",
  130. strategy_name="低优",
  131. demand_id="id2",
  132. demand_name="同名需求",
  133. weight=1.0,
  134. )
  135. ],
  136. ),
  137. ]
  138. selected, _ = select_rows_to_write(
  139. strategies=contexts,
  140. existing_demand_ids=set(),
  141. claimed_names={},
  142. )
  143. assert len(selected) == 1
  144. assert selected[0].demand_id == "id1"
  145. def test_higher_priority_blocked_if_lower_priority_already_in_hive() -> None:
  146. contexts = [
  147. ExperimentStrategyContext(
  148. strategy_id="high",
  149. strategy_name="高优",
  150. priority=3,
  151. daily_limit=0,
  152. current_count=0,
  153. staging_rows=[
  154. _row(
  155. strategy_id="high",
  156. strategy_name="高优",
  157. demand_id="id_high",
  158. demand_name="同名需求",
  159. weight=1.0,
  160. )
  161. ],
  162. )
  163. ]
  164. selected, _ = select_rows_to_write(
  165. strategies=contexts,
  166. existing_demand_ids=set(),
  167. claimed_names={"同名需求": {5}},
  168. )
  169. assert len(selected) == 0
  170. def test_unknown_hive_strategy_blocks_all_priorities() -> None:
  171. contexts = [
  172. ExperimentStrategyContext(
  173. strategy_id="high",
  174. strategy_name="高优",
  175. priority=3,
  176. daily_limit=0,
  177. current_count=0,
  178. staging_rows=[
  179. _row(
  180. strategy_id="high",
  181. strategy_name="高优",
  182. demand_id="id_high",
  183. demand_name="同名需求",
  184. weight=1.0,
  185. )
  186. ],
  187. )
  188. ]
  189. selected, _ = select_rows_to_write(
  190. strategies=contexts,
  191. existing_demand_ids=set(),
  192. claimed_names={"同名需求": {"__unknown__"}},
  193. )
  194. assert len(selected) == 0
  195. def test_staging_row_to_odps_record_field_order_and_types() -> None:
  196. record = _staging_row_to_odps_record(
  197. _row(
  198. strategy_id="monthly",
  199. strategy_name="逐月",
  200. demand_id="abc",
  201. demand_name="需求A",
  202. weight=1.5,
  203. )
  204. )
  205. assert record[0] == "逐月"
  206. assert record[1] == "abc"
  207. assert record[2] == "需求A"
  208. assert record[3] == 1.5
  209. assert record[4] == "特征点"
  210. assert record[5] == 1
  211. assert record[6] is None
  212. assert record[7] is None