| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- """实验需求池写入逻辑单元测试。"""
- from app.strategies.staging_store import StagingRow
- from app.sync.experiment_demand_pool_write import (
- ExperimentStrategyContext,
- _staging_row_to_odps_record,
- select_rows_to_write,
- )
- def _row(
- *,
- strategy_id: str,
- strategy_name: str,
- demand_id: str,
- demand_name: str,
- weight: float,
- ) -> StagingRow:
- return StagingRow(
- strategy_config_id=strategy_id,
- strategy=strategy_name,
- demand_id=demand_id,
- demand_name=demand_name,
- weight=weight,
- demand_type="特征点",
- video_count=1,
- video_list=None,
- extend=None,
- batch_date="20260611",
- )
- def test_select_rows_respects_daily_limit() -> None:
- contexts = [
- ExperimentStrategyContext(
- strategy_id="monthly",
- strategy_name="逐月",
- priority=10,
- daily_limit=1,
- current_count=0,
- staging_rows=[
- _row(
- strategy_id="monthly",
- strategy_name="逐月",
- demand_id="a1",
- demand_name="需求A",
- weight=2.0,
- ),
- _row(
- strategy_id="monthly",
- strategy_name="逐月",
- demand_id="a2",
- demand_name="需求B",
- weight=1.0,
- ),
- ],
- )
- ]
- selected, counts = select_rows_to_write(
- strategies=contexts,
- existing_demand_ids=set(),
- claimed_names={},
- )
- assert len(selected) == 1
- assert selected[0].demand_id == "a1"
- assert counts["逐月"] == 1
- def test_select_rows_same_priority_no_name_dedup() -> None:
- contexts = [
- ExperimentStrategyContext(
- strategy_id="s1",
- strategy_name="策略1",
- priority=10,
- daily_limit=0,
- current_count=0,
- staging_rows=[
- _row(
- strategy_id="s1",
- strategy_name="策略1",
- demand_id="id1",
- demand_name="同名需求",
- weight=1.0,
- )
- ],
- ),
- ExperimentStrategyContext(
- strategy_id="s2",
- strategy_name="策略2",
- priority=10,
- daily_limit=0,
- current_count=0,
- staging_rows=[
- _row(
- strategy_id="s2",
- strategy_name="策略2",
- demand_id="id2",
- demand_name="同名需求",
- weight=1.0,
- )
- ],
- ),
- ]
- selected, _ = select_rows_to_write(
- strategies=contexts,
- existing_demand_ids=set(),
- claimed_names={},
- )
- assert len(selected) == 2
- def test_select_rows_different_priority_name_dedup() -> None:
- contexts = [
- ExperimentStrategyContext(
- strategy_id="high",
- strategy_name="高优",
- priority=5,
- daily_limit=0,
- current_count=0,
- staging_rows=[
- _row(
- strategy_id="high",
- strategy_name="高优",
- demand_id="id1",
- demand_name="同名需求",
- weight=1.0,
- )
- ],
- ),
- ExperimentStrategyContext(
- strategy_id="low",
- strategy_name="低优",
- priority=20,
- daily_limit=0,
- current_count=0,
- staging_rows=[
- _row(
- strategy_id="low",
- strategy_name="低优",
- demand_id="id2",
- demand_name="同名需求",
- weight=1.0,
- )
- ],
- ),
- ]
- selected, _ = select_rows_to_write(
- strategies=contexts,
- existing_demand_ids=set(),
- claimed_names={},
- )
- assert len(selected) == 1
- assert selected[0].demand_id == "id1"
- def test_higher_priority_blocked_if_lower_priority_already_in_hive() -> None:
- contexts = [
- ExperimentStrategyContext(
- strategy_id="high",
- strategy_name="高优",
- priority=3,
- daily_limit=0,
- current_count=0,
- staging_rows=[
- _row(
- strategy_id="high",
- strategy_name="高优",
- demand_id="id_high",
- demand_name="同名需求",
- weight=1.0,
- )
- ],
- )
- ]
- selected, _ = select_rows_to_write(
- strategies=contexts,
- existing_demand_ids=set(),
- claimed_names={"同名需求": {5}},
- )
- assert len(selected) == 0
- def test_unknown_hive_strategy_blocks_all_priorities() -> None:
- contexts = [
- ExperimentStrategyContext(
- strategy_id="high",
- strategy_name="高优",
- priority=3,
- daily_limit=0,
- current_count=0,
- staging_rows=[
- _row(
- strategy_id="high",
- strategy_name="高优",
- demand_id="id_high",
- demand_name="同名需求",
- weight=1.0,
- )
- ],
- )
- ]
- selected, _ = select_rows_to_write(
- strategies=contexts,
- existing_demand_ids=set(),
- claimed_names={"同名需求": {"__unknown__"}},
- )
- assert len(selected) == 0
- def test_staging_row_to_odps_record_field_order_and_types() -> None:
- record = _staging_row_to_odps_record(
- _row(
- strategy_id="monthly",
- strategy_name="逐月",
- demand_id="abc",
- demand_name="需求A",
- weight=1.5,
- )
- )
- assert record[0] == "逐月"
- assert record[1] == "abc"
- assert record[2] == "需求A"
- assert record[3] == 1.5
- assert record[4] == "特征点"
- assert record[5] == 1
- assert record[6] is None
- assert record[7] is None
|