import json import sys import asyncio from pathlib import Path repo_root = str(Path(__file__).parent.parent.parent.parent) if repo_root not in sys.path: sys.path.insert(0, repo_root) from dotenv import load_dotenv load_dotenv() from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore from agent.llm.openrouter import openrouter_llm_call OUTPUT_JSON = Path("examples/process_pipeline/script/coverage_scores.json") EVAL_PROMPT = """ You are an expert system architecture and pipeline evaluator. You will be provided with a User Requirement and multiple alternative Proposed Pipeline Workflows created to resolve that requirement. Your task is to evaluate how well each workflow semantically covers and resolves the user's needs. User Requirement: {req_desc} Proposed Workflows: {workflows_json} For each workflow, assign a `coverage_score` between 0.00 and 1.00 (1.00 = completely and deeply resolves the core requirement). Return your result STRICTLY as a JSON array of objects, one for each workflow evaluated, containing: [ {{ "strategy_id": "", "coverage_score": 0.85, "explanation": "<1-2 sentence justification on what it covers well and what it might be missing>" }} ] DO NOT output any thinking, markdown wrapping (```json), or conversational text. Output ONLY the raw JSON array. """ async def process_requirement(req_desc: str, group_strats: list) -> dict: # Prepare payload to send to LLM workflows_payload = [] for s in group_strats: body_data = s.get("body") or {} if isinstance(body_data, str): try: body_data = json.loads(body_data) except: body_data = {} workflows_payload.append({ "strategy_id": s["id"], "workflow": body_data.get("workflow", []) }) prompt = EVAL_PROMPT.format( req_desc=req_desc, workflows_json=json.dumps(workflows_payload, ensure_ascii=False, indent=2) ) try: resp = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="anthropic/claude-sonnet-4.5", # OpenRouter uses this to route to latest 3.5 Sonnet max_tokens=4096, temperature=0.1 ) content = resp["content"].strip() if content.startswith("```json"): content = content.replace("```json", "").replace("```", "").strip() elif content.startswith("```"): content = content.replace("```", "").strip() return json.loads(content) except Exception as e: print(f" [Error] LLM Call failed for a requirement: {e}") return [] async def main(dry_run: bool = False, force: bool = False): print("Connecting to DB...") strat_store = PostgreSQLStrategyStore() req_store = PostgreSQLRequirementStore() requirements = req_store.list_all(limit=10000) strategies = strat_store.list_all(limit=10000) strat_map = {s["id"]: s for s in strategies} output_data = {} if OUTPUT_JSON.exists() and not force: try: with open(OUTPUT_JSON, "r", encoding="utf-8") as f: output_data = json.load(f) print(f"Loaded existing coverage scores for {len(output_data)} requirements. Resuming...") except: print("Failed to load existing JSON, starting fresh.") elif force: print("Force run enabled. Discarding existing records and starting completely fresh.") processed_req_ids = set(output_data.keys()) total_reqs = len(output_data) # Filter out already processed requirements pending_requirements = [r for r in requirements if r["id"] not in processed_req_ids] print(f"Starting LLM coverage semantic evaluation using Sonnet 4.5 via OpenRouter...") print(f"Total Requirements remaining to evaluate: {len(pending_requirements)} (out of {len(requirements)})") # Process in batches of 10 concurrent requests batch_size = 10 for i in range(0, len(pending_requirements), batch_size): batch_reqs = pending_requirements[i:i+batch_size] tasks = [] print(f"Evaluating Batch {i//batch_size + 1} (Reqs {i+1} to min({i+batch_size}, {len(pending_requirements)}))") for req in batch_reqs: req_id = req["id"] req_desc = req.get("description", "Unknown Description") req_strat_ids = req.get("strategy_ids") or [] group_strats = [strat_map[sid] for sid in req_strat_ids if sid in strat_map] if not group_strats: continue tasks.append((req_id, req_desc, process_requirement(req_desc, group_strats))) if not tasks: continue # Execute batch concurrently results = await asyncio.gather(*(t[2] for t in tasks)) # Map results back for idx, (req_id, req_desc, _) in enumerate(tasks): evaluations = results[idx] if not evaluations: continue strat_results = [] for ev in evaluations: sid = ev.get("strategy_id") if sid in strat_map: strat_info = strat_map[sid] is_selected = (strat_info.get("status") == "published") strat_results.append({ "strategy_id": sid, "strategy_name": strat_info.get("name", ""), "is_selected": is_selected, "coverage_score": ev.get("coverage_score", 0), "explanation": ev.get("explanation", "") }) if strat_results: strat_results.sort(key=lambda x: x["coverage_score"], reverse=True) output_data[req_id] = { "requirement_desc": req_desc, "strategies": strat_results } total_reqs += 1 # Write the calculated score and explanation directly back to the database body updated_count = 0 for ev in strat_results: sid = ev["strategy_id"] if sid in strat_map: strat_info = strat_map[sid] body_data = strat_info.get("body") or {} if isinstance(body_data, str): try: body_data = json.loads(body_data) except: body_data = {} body_data.setdefault("coverage_evaluations", {}) body_data["coverage_evaluations"][req_id] = { "score": ev["coverage_score"], "explanation": ev["explanation"] } if not dry_run: try: strat_store.update(sid, {"body": json.dumps(body_data, ensure_ascii=False)}) strat_map[sid]["body"] = body_data # Update local map cache updated_count += 1 except Exception as e: print(f" [Error] Failed to update body for strategy {sid}: {e}") else: updated_count += 1 tag_word = "[DRY-RUN] Simulated updating" if dry_run else "Updated" print(f" -> Processed requirement {req_id}: {tag_word} DB body for {updated_count} strategies.") # Save incrementally after every batch to prevent data loss with open(OUTPUT_JSON, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) print(f"Evaluated {total_reqs} requirements overall.") print(f"Results {"simulated (DB untouched)" if dry_run else "and DB updates"} successfully saved to: {OUTPUT_JSON}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true", help="Calculate scores and save to JSON only, do not write to DB") parser.add_argument("--force", action="store_true", help="Discard existing JSON and rerun all requirements from scratch") args = parser.parse_args() asyncio.run(main(args.dry_run, args.force))