"""实验需求池写入逻辑单元测试。""" from app.strategies.staging_store import StagingRow from app.sync.experiment_demand_pool_write import ( ExperimentStrategyContext, _staging_row_to_odps_record, select_rows_to_write, ) def _row( *, strategy_id: str, strategy_name: str, demand_id: str, demand_name: str, weight: float, ) -> StagingRow: return StagingRow( strategy_config_id=strategy_id, strategy=strategy_name, demand_id=demand_id, demand_name=demand_name, weight=weight, demand_type="特征点", video_count=1, video_list=None, extend=None, batch_date="20260611", ) def test_select_rows_respects_daily_limit() -> None: contexts = [ ExperimentStrategyContext( strategy_id="monthly", strategy_name="逐月", priority=10, daily_limit=1, current_count=0, staging_rows=[ _row( strategy_id="monthly", strategy_name="逐月", demand_id="a1", demand_name="需求A", weight=2.0, ), _row( strategy_id="monthly", strategy_name="逐月", demand_id="a2", demand_name="需求B", weight=1.0, ), ], ) ] selected, counts = select_rows_to_write( strategies=contexts, existing_demand_ids=set(), claimed_names={}, ) assert len(selected) == 1 assert selected[0].demand_id == "a1" assert counts["逐月"] == 1 def test_select_rows_same_priority_no_name_dedup() -> None: contexts = [ ExperimentStrategyContext( strategy_id="s1", strategy_name="策略1", priority=10, daily_limit=0, current_count=0, staging_rows=[ _row( strategy_id="s1", strategy_name="策略1", demand_id="id1", demand_name="同名需求", weight=1.0, ) ], ), ExperimentStrategyContext( strategy_id="s2", strategy_name="策略2", priority=10, daily_limit=0, current_count=0, staging_rows=[ _row( strategy_id="s2", strategy_name="策略2", demand_id="id2", demand_name="同名需求", weight=1.0, ) ], ), ] selected, _ = select_rows_to_write( strategies=contexts, existing_demand_ids=set(), claimed_names={}, ) assert len(selected) == 2 def test_select_rows_different_priority_name_dedup() -> None: contexts = [ ExperimentStrategyContext( strategy_id="high", strategy_name="高优", priority=5, daily_limit=0, current_count=0, staging_rows=[ _row( strategy_id="high", strategy_name="高优", demand_id="id1", demand_name="同名需求", weight=1.0, ) ], ), ExperimentStrategyContext( strategy_id="low", strategy_name="低优", priority=20, daily_limit=0, current_count=0, staging_rows=[ _row( strategy_id="low", strategy_name="低优", demand_id="id2", demand_name="同名需求", weight=1.0, ) ], ), ] selected, _ = select_rows_to_write( strategies=contexts, existing_demand_ids=set(), claimed_names={}, ) assert len(selected) == 1 assert selected[0].demand_id == "id1" def test_higher_priority_blocked_if_lower_priority_already_in_hive() -> None: contexts = [ ExperimentStrategyContext( strategy_id="high", strategy_name="高优", priority=3, daily_limit=0, current_count=0, staging_rows=[ _row( strategy_id="high", strategy_name="高优", demand_id="id_high", demand_name="同名需求", weight=1.0, ) ], ) ] selected, _ = select_rows_to_write( strategies=contexts, existing_demand_ids=set(), claimed_names={"同名需求": {5}}, ) assert len(selected) == 0 def test_unknown_hive_strategy_blocks_all_priorities() -> None: contexts = [ ExperimentStrategyContext( strategy_id="high", strategy_name="高优", priority=3, daily_limit=0, current_count=0, staging_rows=[ _row( strategy_id="high", strategy_name="高优", demand_id="id_high", demand_name="同名需求", weight=1.0, ) ], ) ] selected, _ = select_rows_to_write( strategies=contexts, existing_demand_ids=set(), claimed_names={"同名需求": {"__unknown__"}}, ) assert len(selected) == 0 def test_staging_row_to_odps_record_field_order_and_types() -> None: record = _staging_row_to_odps_record( _row( strategy_id="monthly", strategy_name="逐月", demand_id="abc", demand_name="需求A", weight=1.5, ) ) assert record[0] == "逐月" assert record[1] == "abc" assert record[2] == "需求A" assert record[3] == 1.5 assert record[4] == "特征点" assert record[5] == 1 assert record[6] is None assert record[7] is None