compute_coverage_scores.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. import json
  2. import sys
  3. import asyncio
  4. import re
  5. from pathlib import Path
  6. repo_root = str(Path(__file__).parent.parent.parent.parent)
  7. if repo_root not in sys.path:
  8. sys.path.insert(0, repo_root)
  9. from dotenv import load_dotenv
  10. load_dotenv()
  11. from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
  12. from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
  13. from agent.llm.openrouter import openrouter_llm_call
  14. OUTPUT_JSON = Path("examples/process_pipeline/script/coverage_scores.json")
  15. FAILED_JSON = Path("examples/process_pipeline/script/failed_requirements.json")
  16. EVAL_PROMPT = """
  17. You are an expert system architecture and pipeline evaluator.
  18. You will be provided with a User Requirement and multiple alternative Proposed Pipeline Workflows created to resolve that requirement.
  19. Your task is to evaluate how well each workflow semantically covers and resolves the user's needs.
  20. User Requirement:
  21. {req_desc}
  22. Proposed Workflows:
  23. {workflows_json}
  24. For each workflow, assign a `coverage_score` between 0.00 and 1.00 (1.00 = completely and deeply resolves the core requirement).
  25. Return your result STRICTLY as a JSON array of objects, one for each workflow evaluated, containing:
  26. [
  27. {{
  28. "strategy_id": "<exact strategy_id from the input>",
  29. "coverage_score": 0.85,
  30. "explanation": "<1-2 sentence justification on what it covers well and what it might be missing>"
  31. }}
  32. ]
  33. DO NOT output any thinking, markdown wrapping (```json), or conversational text. Output ONLY the raw JSON array.
  34. """
  35. async def process_requirement(req_desc: str, group_strats: list, max_retries: int = 3) -> list:
  36. # Prepare payload to send to LLM
  37. workflows_payload = []
  38. for s in group_strats:
  39. body_data = s.get("body") or {}
  40. if isinstance(body_data, str):
  41. try:
  42. body_data = json.loads(body_data)
  43. except:
  44. body_data = {}
  45. workflows_payload.append({
  46. "strategy_id": s["id"],
  47. "workflow": body_data.get("workflow", [])
  48. })
  49. prompt = EVAL_PROMPT.format(
  50. req_desc=req_desc,
  51. workflows_json=json.dumps(workflows_payload, ensure_ascii=False, indent=2)
  52. )
  53. for attempt in range(max_retries):
  54. try:
  55. resp = await openrouter_llm_call(
  56. messages=[{"role": "user", "content": prompt}],
  57. model="anthropic/claude-sonnet-4.5", # OpenRouter uses this to route to latest 3.5 Sonnet
  58. max_tokens=4096,
  59. temperature=0.1
  60. )
  61. content = resp["content"].strip()
  62. # Extract JSON array using regex if there's surrounding text
  63. json_match = re.search(r'\[.*\]', content, re.DOTALL)
  64. if json_match:
  65. content = json_match.group(0)
  66. if content.startswith("```json"):
  67. content = content.replace("```json", "", 1).replace("```", "").strip()
  68. elif content.startswith("```"):
  69. content = content.replace("```", "", 1).replace("```", "").strip()
  70. parsed_json = json.loads(content)
  71. # Validation
  72. if not isinstance(parsed_json, list):
  73. raise ValueError("LLM response is not a JSON array.")
  74. for item in parsed_json:
  75. if "strategy_id" not in item or "coverage_score" not in item:
  76. raise ValueError("JSON array items missing required keys (strategy_id, coverage_score).")
  77. return parsed_json
  78. except Exception as e:
  79. print(f" [Error] LLM Call failed for a requirement (Attempt {attempt+1}/{max_retries}): {e}")
  80. if attempt < max_retries - 1:
  81. await asyncio.sleep(2 ** attempt) # Exponential backoff
  82. else:
  83. print(f" [Fatal] Failed to evaluate requirement after {max_retries} attempts.")
  84. return []
  85. async def main(dry_run: bool = False, force: bool = False, retry_failed: bool = False):
  86. print("Connecting to DB...")
  87. strat_store = PostgreSQLStrategyStore()
  88. req_store = PostgreSQLRequirementStore()
  89. requirements = req_store.list_all(limit=10000)
  90. strategies = strat_store.list_all(limit=10000)
  91. strat_map = {s["id"]: s for s in strategies}
  92. output_data = {}
  93. if OUTPUT_JSON.exists() and not force:
  94. try:
  95. with open(OUTPUT_JSON, "r", encoding="utf-8") as f:
  96. output_data = json.load(f)
  97. print(f"Loaded existing coverage scores for {len(output_data)} requirements. Resuming...")
  98. except:
  99. print("Failed to load existing JSON, starting fresh.")
  100. elif force:
  101. print("Force run enabled. Discarding existing records and starting completely fresh.")
  102. processed_req_ids = set(output_data.keys())
  103. failed_req_ids = set()
  104. if FAILED_JSON.exists() and not force:
  105. try:
  106. with open(FAILED_JSON, "r", encoding="utf-8") as f:
  107. failed_req_ids = set(json.load(f))
  108. print(f"Loaded {len(failed_req_ids)} previously failed requirements.")
  109. except:
  110. print("Failed to load failed_requirements.json.")
  111. total_reqs = len(output_data)
  112. # Filter out already processed requirements
  113. if retry_failed:
  114. pending_requirements = [r for r in requirements if r["id"] in failed_req_ids and r["id"] not in processed_req_ids]
  115. print("Retry-failed mode enabled. Only processing previously failed requirements.")
  116. else:
  117. pending_requirements = [r for r in requirements if r["id"] not in processed_req_ids]
  118. print(f"Starting LLM coverage semantic evaluation using Sonnet 4.5 via OpenRouter...")
  119. print(f"Total Requirements remaining to evaluate: {len(pending_requirements)} (out of {len(requirements)})")
  120. # Process in batches of 10 concurrent requests
  121. batch_size = 10
  122. for i in range(0, len(pending_requirements), batch_size):
  123. batch_reqs = pending_requirements[i:i+batch_size]
  124. tasks = []
  125. print(f"Evaluating Batch {i//batch_size + 1} (Reqs {i+1} to min({i+batch_size}, {len(pending_requirements)}))")
  126. for req in batch_reqs:
  127. req_id = req["id"]
  128. req_desc = req.get("description", "Unknown Description")
  129. req_strat_ids = req.get("strategy_ids") or []
  130. group_strats = [strat_map[sid] for sid in req_strat_ids if sid in strat_map]
  131. if not group_strats:
  132. continue
  133. tasks.append((req_id, req_desc, process_requirement(req_desc, group_strats)))
  134. if not tasks:
  135. continue
  136. # Execute batch concurrently
  137. results = await asyncio.gather(*(t[2] for t in tasks))
  138. # Map results back
  139. for idx, (req_id, req_desc, _) in enumerate(tasks):
  140. evaluations = results[idx]
  141. if not evaluations:
  142. failed_req_ids.add(req_id)
  143. continue
  144. if req_id in failed_req_ids:
  145. failed_req_ids.remove(req_id)
  146. strat_results = []
  147. for ev in evaluations:
  148. sid = ev.get("strategy_id")
  149. if sid in strat_map:
  150. strat_info = strat_map[sid]
  151. is_selected = (strat_info.get("status") == "published")
  152. strat_results.append({
  153. "strategy_id": sid,
  154. "strategy_name": strat_info.get("name", ""),
  155. "is_selected": is_selected,
  156. "coverage_score": ev.get("coverage_score", 0),
  157. "explanation": ev.get("explanation", "")
  158. })
  159. if strat_results:
  160. strat_results.sort(key=lambda x: x["coverage_score"], reverse=True)
  161. output_data[req_id] = {
  162. "requirement_desc": req_desc,
  163. "strategies": strat_results
  164. }
  165. total_reqs += 1
  166. # Write the calculated score and explanation directly back to the database body
  167. updated_count = 0
  168. for ev in strat_results:
  169. sid = ev["strategy_id"]
  170. if sid in strat_map:
  171. strat_info = strat_map[sid]
  172. body_data = strat_info.get("body") or {}
  173. if isinstance(body_data, str):
  174. try:
  175. body_data = json.loads(body_data)
  176. except:
  177. body_data = {}
  178. body_data.setdefault("coverage_evaluations", {})
  179. body_data["coverage_evaluations"][req_id] = {
  180. "score": ev["coverage_score"],
  181. "explanation": ev["explanation"]
  182. }
  183. if not dry_run:
  184. try:
  185. strat_store.update(sid, {"body": json.dumps(body_data, ensure_ascii=False)})
  186. strat_map[sid]["body"] = body_data # Update local map cache
  187. updated_count += 1
  188. except Exception as e:
  189. print(f" [Error] Failed to update body for strategy {sid}: {e}")
  190. else:
  191. updated_count += 1
  192. tag_word = "[DRY-RUN] Simulated updating" if dry_run else "Updated"
  193. print(f" -> Processed requirement {req_id}: {tag_word} DB body for {updated_count} strategies.")
  194. # Save incrementally after every batch to prevent data loss
  195. with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
  196. json.dump(output_data, f, ensure_ascii=False, indent=2)
  197. with open(FAILED_JSON, "w", encoding="utf-8") as f:
  198. json.dump(list(failed_req_ids), f, ensure_ascii=False, indent=2)
  199. print(f"Evaluated {total_reqs} requirements overall.")
  200. print(f"Results {"simulated (DB untouched)" if dry_run else "and DB updates"} successfully saved to: {OUTPUT_JSON}")
  201. if failed_req_ids:
  202. print(f"WARNING: {len(failed_req_ids)} requirements failed during evaluation. They have been saved to {FAILED_JSON}")
  203. if __name__ == "__main__":
  204. import argparse
  205. parser = argparse.ArgumentParser()
  206. parser.add_argument("--dry-run", action="store_true", help="Calculate scores and save to JSON only, do not write to DB")
  207. parser.add_argument("--force", action="store_true", help="Discard existing JSON and rerun all requirements from scratch")
  208. parser.add_argument("--retry-failed", action="store_true", help="Only retry requirements that are listed in the failed JSON file")
  209. args = parser.parse_args()
  210. asyncio.run(main(args.dry_run, args.force, args.retry_failed))