source_evidence.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. from __future__ import annotations
  2. import copy
  3. from typing import Any
  4. def build_source_evidence(
  5. run_id: str,
  6. policy_run_id: str,
  7. discovered_content_item: dict[str, Any],
  8. result: dict[str, Any],
  9. source_context: dict[str, Any],
  10. ) -> dict[str, Any]:
  11. evidence_pack = copy.deepcopy(source_context["ext_data"]["evidence_pack"])
  12. source_evidence = {
  13. **evidence_pack,
  14. "run_id": run_id,
  15. "policy_run_id": policy_run_id,
  16. "search_query_id": discovered_content_item["search_query_id"],
  17. "search_query": result.get("search_query"),
  18. "search_query_generation_method": result.get("search_query_generation_method"),
  19. "query_sources": copy.deepcopy(result.get("query_sources", [])),
  20. "matched_search_query_ids": list(result.get("matched_search_query_ids", [])),
  21. "matched_search_queries": list(result.get("matched_search_queries", [])),
  22. "matched_search_query_generation_methods": list(
  23. result.get("matched_search_query_generation_methods", [])
  24. ),
  25. "discovery_start_source": discovered_content_item["discovery_start_source"],
  26. "previous_discovery_step": discovered_content_item["previous_discovery_step"],
  27. "origin_path_id": (
  28. "search_query_to_content:"
  29. f"{discovered_content_item['search_query_id']}:"
  30. f"{discovered_content_item['platform_content_id']}"
  31. ),
  32. "source_path_record_ids": [],
  33. "discovered_platform_content_id": discovered_content_item["platform_content_id"],
  34. "discovery_relation": result.get(
  35. "discovery_relation", "derived_from_pattern_demand"
  36. ),
  37. }
  38. return source_evidence