| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- import json
- import sys
- import asyncio
- import re
- from pathlib import Path
- repo_root = str(Path(__file__).parent.parent.parent.parent)
- if repo_root not in sys.path:
- sys.path.insert(0, repo_root)
- from dotenv import load_dotenv
- load_dotenv()
- from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
- from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
- from agent.llm.openrouter import openrouter_llm_call
- OUTPUT_JSON = Path("examples/process_pipeline/script/coverage_scores.json")
- FAILED_JSON = Path("examples/process_pipeline/script/failed_requirements.json")
- EVAL_PROMPT = """
- You are an expert system architecture and pipeline evaluator.
- You will be provided with a User Requirement and multiple alternative Proposed Pipeline Workflows created to resolve that requirement.
- Your task is to evaluate how well each workflow semantically covers and resolves the user's needs.
- User Requirement:
- {req_desc}
- Proposed Workflows:
- {workflows_json}
- For each workflow, assign a `coverage_score` between 0.00 and 1.00 (1.00 = completely and deeply resolves the core requirement).
- Return your result STRICTLY as a JSON array of objects, one for each workflow evaluated, containing:
- [
- {{
- "strategy_id": "<exact strategy_id from the input>",
- "coverage_score": 0.85,
- "explanation": "<1-2 sentence justification on what it covers well and what it might be missing>"
- }}
- ]
- DO NOT output any thinking, markdown wrapping (```json), or conversational text. Output ONLY the raw JSON array.
- """
- async def process_requirement(req_desc: str, group_strats: list, max_retries: int = 3) -> list:
- # Prepare payload to send to LLM
- workflows_payload = []
- for s in group_strats:
- body_data = s.get("body") or {}
- if isinstance(body_data, str):
- try:
- body_data = json.loads(body_data)
- except:
- body_data = {}
-
- workflows_payload.append({
- "strategy_id": s["id"],
- "workflow": body_data.get("workflow", [])
- })
-
- prompt = EVAL_PROMPT.format(
- req_desc=req_desc,
- workflows_json=json.dumps(workflows_payload, ensure_ascii=False, indent=2)
- )
-
- for attempt in range(max_retries):
- try:
- resp = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="anthropic/claude-sonnet-4.5", # OpenRouter uses this to route to latest 3.5 Sonnet
- max_tokens=4096,
- temperature=0.1
- )
- content = resp["content"].strip()
-
- # Extract JSON array using regex if there's surrounding text
- json_match = re.search(r'\[.*\]', content, re.DOTALL)
- if json_match:
- content = json_match.group(0)
-
- if content.startswith("```json"):
- content = content.replace("```json", "", 1).replace("```", "").strip()
- elif content.startswith("```"):
- content = content.replace("```", "", 1).replace("```", "").strip()
-
- parsed_json = json.loads(content)
-
- # Validation
- if not isinstance(parsed_json, list):
- raise ValueError("LLM response is not a JSON array.")
- for item in parsed_json:
- if "strategy_id" not in item or "coverage_score" not in item:
- raise ValueError("JSON array items missing required keys (strategy_id, coverage_score).")
-
- return parsed_json
-
- except Exception as e:
- print(f" [Error] LLM Call failed for a requirement (Attempt {attempt+1}/{max_retries}): {e}")
- if attempt < max_retries - 1:
- await asyncio.sleep(2 ** attempt) # Exponential backoff
- else:
- print(f" [Fatal] Failed to evaluate requirement after {max_retries} attempts.")
- return []
- async def main(dry_run: bool = False, force: bool = False, retry_failed: bool = False):
- print("Connecting to DB...")
- strat_store = PostgreSQLStrategyStore()
- req_store = PostgreSQLRequirementStore()
-
- requirements = req_store.list_all(limit=10000)
- strategies = strat_store.list_all(limit=10000)
-
- strat_map = {s["id"]: s for s in strategies}
-
- output_data = {}
- if OUTPUT_JSON.exists() and not force:
- try:
- with open(OUTPUT_JSON, "r", encoding="utf-8") as f:
- output_data = json.load(f)
- print(f"Loaded existing coverage scores for {len(output_data)} requirements. Resuming...")
- except:
- print("Failed to load existing JSON, starting fresh.")
- elif force:
- print("Force run enabled. Discarding existing records and starting completely fresh.")
-
- processed_req_ids = set(output_data.keys())
-
- failed_req_ids = set()
- if FAILED_JSON.exists() and not force:
- try:
- with open(FAILED_JSON, "r", encoding="utf-8") as f:
- failed_req_ids = set(json.load(f))
- print(f"Loaded {len(failed_req_ids)} previously failed requirements.")
- except:
- print("Failed to load failed_requirements.json.")
-
- total_reqs = len(output_data)
- # Filter out already processed requirements
- if retry_failed:
- pending_requirements = [r for r in requirements if r["id"] in failed_req_ids and r["id"] not in processed_req_ids]
- print("Retry-failed mode enabled. Only processing previously failed requirements.")
- else:
- pending_requirements = [r for r in requirements if r["id"] not in processed_req_ids]
-
- print(f"Starting LLM coverage semantic evaluation using Sonnet 4.5 via OpenRouter...")
- print(f"Total Requirements remaining to evaluate: {len(pending_requirements)} (out of {len(requirements)})")
-
- # Process in batches of 10 concurrent requests
- batch_size = 10
-
- for i in range(0, len(pending_requirements), batch_size):
- batch_reqs = pending_requirements[i:i+batch_size]
- tasks = []
-
- print(f"Evaluating Batch {i//batch_size + 1} (Reqs {i+1} to min({i+batch_size}, {len(pending_requirements)}))")
-
- for req in batch_reqs:
- req_id = req["id"]
- req_desc = req.get("description", "Unknown Description")
- req_strat_ids = req.get("strategy_ids") or []
-
- group_strats = [strat_map[sid] for sid in req_strat_ids if sid in strat_map]
- if not group_strats:
- continue
-
- tasks.append((req_id, req_desc, process_requirement(req_desc, group_strats)))
-
- if not tasks:
- continue
-
- # Execute batch concurrently
- results = await asyncio.gather(*(t[2] for t in tasks))
-
- # Map results back
- for idx, (req_id, req_desc, _) in enumerate(tasks):
- evaluations = results[idx]
- if not evaluations:
- failed_req_ids.add(req_id)
- continue
-
- if req_id in failed_req_ids:
- failed_req_ids.remove(req_id)
-
- strat_results = []
- for ev in evaluations:
- sid = ev.get("strategy_id")
- if sid in strat_map:
- strat_info = strat_map[sid]
- is_selected = (strat_info.get("status") == "published")
- strat_results.append({
- "strategy_id": sid,
- "strategy_name": strat_info.get("name", ""),
- "is_selected": is_selected,
- "coverage_score": ev.get("coverage_score", 0),
- "explanation": ev.get("explanation", "")
- })
-
- if strat_results:
- strat_results.sort(key=lambda x: x["coverage_score"], reverse=True)
- output_data[req_id] = {
- "requirement_desc": req_desc,
- "strategies": strat_results
- }
- total_reqs += 1
-
- # Write the calculated score and explanation directly back to the database body
- updated_count = 0
- for ev in strat_results:
- sid = ev["strategy_id"]
- if sid in strat_map:
- strat_info = strat_map[sid]
- body_data = strat_info.get("body") or {}
- if isinstance(body_data, str):
- try:
- body_data = json.loads(body_data)
- except:
- body_data = {}
-
- body_data.setdefault("coverage_evaluations", {})
- body_data["coverage_evaluations"][req_id] = {
- "score": ev["coverage_score"],
- "explanation": ev["explanation"]
- }
-
- if not dry_run:
- try:
- strat_store.update(sid, {"body": json.dumps(body_data, ensure_ascii=False)})
- strat_map[sid]["body"] = body_data # Update local map cache
- updated_count += 1
- except Exception as e:
- print(f" [Error] Failed to update body for strategy {sid}: {e}")
- else:
- updated_count += 1
-
- tag_word = "[DRY-RUN] Simulated updating" if dry_run else "Updated"
- print(f" -> Processed requirement {req_id}: {tag_word} DB body for {updated_count} strategies.")
-
- # Save incrementally after every batch to prevent data loss
- with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
- json.dump(output_data, f, ensure_ascii=False, indent=2)
- with open(FAILED_JSON, "w", encoding="utf-8") as f:
- json.dump(list(failed_req_ids), f, ensure_ascii=False, indent=2)
- print(f"Evaluated {total_reqs} requirements overall.")
- print(f"Results {"simulated (DB untouched)" if dry_run else "and DB updates"} successfully saved to: {OUTPUT_JSON}")
- if failed_req_ids:
- print(f"WARNING: {len(failed_req_ids)} requirements failed during evaluation. They have been saved to {FAILED_JSON}")
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument("--dry-run", action="store_true", help="Calculate scores and save to JSON only, do not write to DB")
- parser.add_argument("--force", action="store_true", help="Discard existing JSON and rerun all requirements from scratch")
- parser.add_argument("--retry-failed", action="store_true", help="Only retry requirements that are listed in the failed JSON file")
- args = parser.parse_args()
- asyncio.run(main(args.dry_run, args.force, args.retry_failed))
|