test_platform_access.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. from content_agent.business_modules import platform_access
  2. from content_agent.errors import ContentAgentError, ErrorCode
  3. class DuplicateContentClient:
  4. def search(self, search_query):
  5. return [
  6. {
  7. "content_discovery_id": f"{search_query['search_query_id']}_content_001",
  8. "search_query_id": search_query["search_query_id"],
  9. "platform_content_id": "7601814454925298994",
  10. "description": "重复内容",
  11. }
  12. ]
  13. def test_platform_access_deduplicates_same_content_across_search_queries():
  14. search_queries = [
  15. {
  16. "search_query_id": "q_001",
  17. "search_query": "对比分析",
  18. "search_query_generation_method": "item_single",
  19. },
  20. {
  21. "search_query_id": "q_002",
  22. "search_query": "对比分析 人物故事",
  23. "search_query_generation_method": "item_single",
  24. },
  25. ]
  26. result = platform_access.run(search_queries, DuplicateContentClient())
  27. results = result["platform_results"]
  28. assert len(results) == 1
  29. assert results[0]["search_query_id"] == "q_001"
  30. assert results[0]["search_query"] == "对比分析"
  31. assert results[0]["matched_search_query_ids"] == ["q_001", "q_002"]
  32. assert results[0]["matched_search_queries"] == ["对比分析", "对比分析 人物故事"]
  33. class UniqueContentClient:
  34. def search(self, search_query):
  35. return [
  36. {
  37. "content_discovery_id": f"{search_query['search_query_id']}_content_001",
  38. "search_query_id": search_query["search_query_id"],
  39. "platform_content_id": f"platform_{search_query['search_query_id']}",
  40. "description": search_query["search_query"],
  41. }
  42. ]
  43. def test_platform_access_preserves_llm_variant_generation_method():
  44. search_queries = [
  45. {
  46. "search_query_id": "q_001",
  47. "search_query": "对比分析",
  48. "search_query_generation_method": "item_single",
  49. },
  50. {
  51. "search_query_id": "q_002",
  52. "search_query": "人物叙事素材",
  53. "search_query_generation_method": "llm_variant",
  54. "llm_variant_of": "q_001",
  55. },
  56. ]
  57. results = platform_access.run(search_queries, UniqueContentClient())["platform_results"]
  58. assert [result["search_query_generation_method"] for result in results] == [
  59. "item_single",
  60. "llm_variant",
  61. ]
  62. assert results[1]["search_query_id"] == "q_002"
  63. assert results[1]["search_query"] == "人物叙事素材"
  64. class FailingThenSuccessfulClient:
  65. def search(self, search_query):
  66. if search_query["search_query_id"] == "q_001":
  67. raise RuntimeError("temporary platform failure")
  68. return [
  69. {
  70. "content_discovery_id": f"{search_query['search_query_id']}_content_001",
  71. "search_query_id": search_query["search_query_id"],
  72. "platform_content_id": "7601814454925298995",
  73. "description": "成功内容",
  74. }
  75. ]
  76. def test_platform_access_keeps_running_after_single_query_failure():
  77. search_queries = [
  78. {
  79. "search_query_id": "q_001",
  80. "search_query": "接口失败",
  81. "search_query_generation_method": "item_single",
  82. },
  83. {
  84. "search_query_id": "q_002",
  85. "search_query": "正常召回",
  86. "search_query_generation_method": "llm_variant",
  87. },
  88. ]
  89. result = platform_access.run(search_queries, FailingThenSuccessfulClient())
  90. assert [item["platform_content_id"] for item in result["platform_results"]] == [
  91. "7601814454925298995"
  92. ]
  93. assert result["query_failures"][0]["search_query_id"] == "q_001"
  94. assert result["query_failures"][0]["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
  95. class AlwaysFailingClient:
  96. def search(self, search_query):
  97. raise RuntimeError("platform unavailable")
  98. def test_platform_access_fails_run_when_all_queries_fail():
  99. search_queries = [
  100. {
  101. "search_query_id": "q_001",
  102. "search_query": "接口失败",
  103. "search_query_generation_method": "item_single",
  104. },
  105. {
  106. "search_query_id": "q_002",
  107. "search_query": "仍然失败",
  108. "search_query_generation_method": "llm_variant",
  109. }
  110. ]
  111. try:
  112. platform_access.run(search_queries, AlwaysFailingClient())
  113. except ContentAgentError as exc:
  114. assert exc.error_code == ErrorCode.PLATFORM_REQUEST_FAILED
  115. assert [failure["search_query_id"] for failure in exc.detail["query_failures"]] == [
  116. "q_001",
  117. "q_002",
  118. ]
  119. for failure in exc.detail["query_failures"]:
  120. assert failure["status"] == "failed"
  121. assert failure["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
  122. assert failure["message"] == "platform query failed"
  123. assert failure["error_detail"]["exception_type"] == "RuntimeError"
  124. else:
  125. raise AssertionError("expected platform request failure")
  126. class RateLimitedClient:
  127. def search(self, search_query):
  128. raise ContentAgentError(
  129. ErrorCode.PLATFORM_RATE_LIMITED,
  130. "crawapi keyword_search failed: rate_limited",
  131. {"operation": "keyword_search", "status_code": 429},
  132. )
  133. class HealthyClient:
  134. def search(self, search_query):
  135. return [
  136. {
  137. "content_discovery_id": f"{search_query['search_query_id']}_content_001",
  138. "search_query_id": search_query["search_query_id"],
  139. "platform_content_id": "7601814454925298001",
  140. "description": "正常内容",
  141. }
  142. ]
  143. class RuntimeErrorClient:
  144. def search(self, search_query):
  145. raise RuntimeError("crawapi keyword_search failed: HTTP 500")
  146. class SplitClient:
  147. """First query rate limited, second query succeeds."""
  148. def search(self, search_query):
  149. if search_query["search_query_id"] == "q_001":
  150. return RateLimitedClient().search(search_query)
  151. return HealthyClient().search(search_query)
  152. def test_platform_access_preserves_rate_limited_error_code():
  153. search_queries = [
  154. {"search_query_id": "q_001", "search_query": "被限流", "search_query_generation_method": "item_single"},
  155. {"search_query_id": "q_002", "search_query": "正常", "search_query_generation_method": "item_single"},
  156. ]
  157. result = platform_access.run(search_queries, SplitClient())
  158. failure = result["query_failures"][0]
  159. assert failure["search_query_id"] == "q_001"
  160. assert failure["error_code"] == ErrorCode.PLATFORM_RATE_LIMITED.value
  161. assert failure["message"] == "crawapi keyword_search failed: rate_limited"
  162. assert failure["error_detail"]["operation"] == "keyword_search"
  163. assert len(result["platform_results"]) == 1
  164. def test_platform_access_counts_runtime_error_as_platform_request_failed():
  165. search_queries = [
  166. {"search_query_id": "q_001", "search_query": "普通失败", "search_query_generation_method": "item_single"},
  167. {"search_query_id": "q_002", "search_query": "正常", "search_query_generation_method": "item_single"},
  168. ]
  169. class MixedClient:
  170. def search(self, search_query):
  171. if search_query["search_query_id"] == "q_001":
  172. raise RuntimeError("crawapi keyword_search failed: HTTP 500")
  173. return HealthyClient().search(search_query)
  174. result = platform_access.run(search_queries, MixedClient())
  175. failure = result["query_failures"][0]
  176. assert failure["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
  177. assert failure["error_detail"]["exception_type"] == "RuntimeError"