import json import sys import asyncio import re from pathlib import Path repo_root = str(Path(__file__).parent.parent.parent.parent) if repo_root not in sys.path: sys.path.insert(0, repo_root) from dotenv import load_dotenv load_dotenv() from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore from agent.llm.openrouter import openrouter_llm_call OUTPUT_JSON = Path("examples/process_pipeline/script/coverage_scores.json") FAILED_JSON = Path("examples/process_pipeline/script/failed_requirements.json") EVAL_PROMPT = """ You are an expert system architecture and pipeline evaluator. You will be provided with a User Requirement and multiple alternative Proposed Pipeline Workflows created to resolve that requirement. Your task is to evaluate how well each workflow semantically covers and resolves the user's needs. User Requirement: {req_desc} Proposed Workflows: {workflows_json} For each workflow, assign a `coverage_score` between 0.00 and 1.00 (1.00 = completely and deeply resolves the core requirement). Return your result STRICTLY as a JSON array of objects, one for each workflow evaluated, containing: [ {{ "strategy_id": "", "coverage_score": 0.85, "explanation": "<1-2 sentence justification on what it covers well and what it might be missing>" }} ] DO NOT output any thinking, markdown wrapping (```json), or conversational text. Output ONLY the raw JSON array. """ async def process_requirement(req_desc: str, group_strats: list, max_retries: int = 3) -> list: # Prepare payload to send to LLM workflows_payload = [] for s in group_strats: body_data = s.get("body") or {} if isinstance(body_data, str): try: body_data = json.loads(body_data) except: body_data = {} workflows_payload.append({ "strategy_id": s["id"], "workflow": body_data.get("workflow", []) }) prompt = EVAL_PROMPT.format( req_desc=req_desc, workflows_json=json.dumps(workflows_payload, ensure_ascii=False, indent=2) ) for attempt in range(max_retries): try: resp = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="anthropic/claude-sonnet-4.5", # OpenRouter uses this to route to latest 3.5 Sonnet max_tokens=4096, temperature=0.1 ) content = resp["content"].strip() # Extract JSON array using regex if there's surrounding text json_match = re.search(r'\[.*\]', content, re.DOTALL) if json_match: content = json_match.group(0) if content.startswith("```json"): content = content.replace("```json", "", 1).replace("```", "").strip() elif content.startswith("```"): content = content.replace("```", "", 1).replace("```", "").strip() parsed_json = json.loads(content) # Validation if not isinstance(parsed_json, list): raise ValueError("LLM response is not a JSON array.") for item in parsed_json: if "strategy_id" not in item or "coverage_score" not in item: raise ValueError("JSON array items missing required keys (strategy_id, coverage_score).") return parsed_json except Exception as e: print(f" [Error] LLM Call failed for a requirement (Attempt {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) # Exponential backoff else: print(f" [Fatal] Failed to evaluate requirement after {max_retries} attempts.") return [] async def main(dry_run: bool = False, force: bool = False, retry_failed: bool = False): print("Connecting to DB...") strat_store = PostgreSQLStrategyStore() req_store = PostgreSQLRequirementStore() requirements = req_store.list_all(limit=10000) strategies = strat_store.list_all(limit=10000) strat_map = {s["id"]: s for s in strategies} output_data = {} if OUTPUT_JSON.exists() and not force: try: with open(OUTPUT_JSON, "r", encoding="utf-8") as f: output_data = json.load(f) print(f"Loaded existing coverage scores for {len(output_data)} requirements. Resuming...") except: print("Failed to load existing JSON, starting fresh.") elif force: print("Force run enabled. Discarding existing records and starting completely fresh.") processed_req_ids = set(output_data.keys()) failed_req_ids = set() if FAILED_JSON.exists() and not force: try: with open(FAILED_JSON, "r", encoding="utf-8") as f: failed_req_ids = set(json.load(f)) print(f"Loaded {len(failed_req_ids)} previously failed requirements.") except: print("Failed to load failed_requirements.json.") total_reqs = len(output_data) # Filter out already processed requirements if retry_failed: pending_requirements = [r for r in requirements if r["id"] in failed_req_ids and r["id"] not in processed_req_ids] print("Retry-failed mode enabled. Only processing previously failed requirements.") else: pending_requirements = [r for r in requirements if r["id"] not in processed_req_ids] print(f"Starting LLM coverage semantic evaluation using Sonnet 4.5 via OpenRouter...") print(f"Total Requirements remaining to evaluate: {len(pending_requirements)} (out of {len(requirements)})") # Process in batches of 10 concurrent requests batch_size = 10 for i in range(0, len(pending_requirements), batch_size): batch_reqs = pending_requirements[i:i+batch_size] tasks = [] print(f"Evaluating Batch {i//batch_size + 1} (Reqs {i+1} to min({i+batch_size}, {len(pending_requirements)}))") for req in batch_reqs: req_id = req["id"] req_desc = req.get("description", "Unknown Description") req_strat_ids = req.get("strategy_ids") or [] group_strats = [strat_map[sid] for sid in req_strat_ids if sid in strat_map] if not group_strats: continue tasks.append((req_id, req_desc, process_requirement(req_desc, group_strats))) if not tasks: continue # Execute batch concurrently results = await asyncio.gather(*(t[2] for t in tasks)) # Map results back for idx, (req_id, req_desc, _) in enumerate(tasks): evaluations = results[idx] if not evaluations: failed_req_ids.add(req_id) continue if req_id in failed_req_ids: failed_req_ids.remove(req_id) strat_results = [] for ev in evaluations: sid = ev.get("strategy_id") if sid in strat_map: strat_info = strat_map[sid] is_selected = (strat_info.get("status") == "published") strat_results.append({ "strategy_id": sid, "strategy_name": strat_info.get("name", ""), "is_selected": is_selected, "coverage_score": ev.get("coverage_score", 0), "explanation": ev.get("explanation", "") }) if strat_results: strat_results.sort(key=lambda x: x["coverage_score"], reverse=True) output_data[req_id] = { "requirement_desc": req_desc, "strategies": strat_results } total_reqs += 1 # Write the calculated score and explanation directly back to the database body updated_count = 0 for ev in strat_results: sid = ev["strategy_id"] if sid in strat_map: strat_info = strat_map[sid] body_data = strat_info.get("body") or {} if isinstance(body_data, str): try: body_data = json.loads(body_data) except: body_data = {} body_data.setdefault("coverage_evaluations", {}) body_data["coverage_evaluations"][req_id] = { "score": ev["coverage_score"], "explanation": ev["explanation"] } if not dry_run: try: strat_store.update(sid, {"body": json.dumps(body_data, ensure_ascii=False)}) strat_map[sid]["body"] = body_data # Update local map cache updated_count += 1 except Exception as e: print(f" [Error] Failed to update body for strategy {sid}: {e}") else: updated_count += 1 tag_word = "[DRY-RUN] Simulated updating" if dry_run else "Updated" print(f" -> Processed requirement {req_id}: {tag_word} DB body for {updated_count} strategies.") # Save incrementally after every batch to prevent data loss with open(OUTPUT_JSON, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) with open(FAILED_JSON, "w", encoding="utf-8") as f: json.dump(list(failed_req_ids), f, ensure_ascii=False, indent=2) print(f"Evaluated {total_reqs} requirements overall.") print(f"Results {"simulated (DB untouched)" if dry_run else "and DB updates"} successfully saved to: {OUTPUT_JSON}") if failed_req_ids: print(f"WARNING: {len(failed_req_ids)} requirements failed during evaluation. They have been saved to {FAILED_JSON}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true", help="Calculate scores and save to JSON only, do not write to DB") parser.add_argument("--force", action="store_true", help="Discard existing JSON and rerun all requirements from scratch") parser.add_argument("--retry-failed", action="store_true", help="Only retry requirements that are listed in the failed JSON file") args = parser.parse_args() asyncio.run(main(args.dry_run, args.force, args.retry_failed))