import json import subprocess import time from pathlib import Path def run_cli(session: str, args: list[str]) -> dict: command = ["browser-use", "--session", session, "--json"] + args result = subprocess.run(command, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(result.stderr.strip() or "browser-use command failed") payload = result.stdout.strip() if not payload: raise RuntimeError("browser-use returned empty output") data = json.loads(payload) if not data.get("success", False): raise RuntimeError(data.get("error", "browser-use command error")) return data.get("data", {}) def stop_session_server(session: str) -> None: subprocess.run( ["browser-use", "--session", session, "server", "stop"], capture_output=True, text=True, ) def main(): project_root = Path(__file__).resolve().parents[1] output_dir = project_root / "output" output_dir.mkdir(parents=True, exist_ok=True) json_file = output_dir / "skill_baidu.json" html_file = output_dir / "skill_baidu_page.html" session = "skill_baidu" keyword = "瑜伽美女" try: stop_session_server(session) try: run_cli(session, ["open", "https://www.baidu.com"]) except RuntimeError: stop_session_server(session) run_cli(session, ["open", "https://www.baidu.com"]) search_js = ( "(function(){" "const input=document.querySelector('#kw');" "const btn=document.querySelector('#su');" "if(input){input.value='" + keyword + "';}" "if(btn){btn.click();}" "else if(input&&input.form){input.form.submit();}" "return {hasInput:!!input,hasButton:!!btn};" "})()" ) run_cli(session, ["eval", search_js]) wait_js = ( "(function(){" "const items=document.querySelectorAll('#content_left .result, #content_left .c-container, #content_left .result-op');" "const bodyReady=!!document.body;" "const bodyLen=bodyReady?(document.body.innerText||'').length:0;" "return {count:items.length, bodyReady:bodyReady, bodyLen:bodyLen};" "})()" ) count = 0 for _ in range(12): data = run_cli(session, ["eval", wait_js]) result = data.get("result") if isinstance(data, dict) else {} count = int(result.get("count") or 0) body_len = int(result.get("bodyLen") or 0) if count >= 3 or body_len > 1000: break time.sleep(1) extract_js = ( "(function(){" "const items=Array.from(document.querySelectorAll('#content_left .result, #content_left .c-container, #content_left .result-op'));" "const results=[];" "for(const item of items){" "const a=item.querySelector('h3 a')||item.querySelector('a[data-click]')||item.querySelector('a');" "if(!a) continue;" "const title=(a.textContent||'').trim();" "const link=a.href||'';" "const summaryEl=item.querySelector('.c-abstract, .content-right_8Zs40, .content-right_8Zs40_2gVt2');" "const summary=(summaryEl?summaryEl.textContent:'').trim();" "results.push({index:results.length+1,title,link,summary});" "if(results.length>=10) break;" "}" "return {success:true,keyword:'" + keyword + "',count:results.length,timestamp:new Date().toISOString(),results:results};" "})()" ) data = run_cli(session, ["eval", extract_js]) extracted = data.get("result") if isinstance(data, dict) else data if not extracted: extracted = { "success": False, "keyword": keyword, "count": 0, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), "results": [], } with open(json_file, "w", encoding="utf-8") as f: json.dump(extracted, f, ensure_ascii=False, indent=2) html_data = run_cli(session, ["eval", "document.documentElement.outerHTML"]) html_content = html_data.get("result") if isinstance(html_data, dict) else html_data with open(html_file, "w", encoding="utf-8") as f: f.write(html_content or "") print(f"✅ 数据已保存到: {json_file}") print(f"✅ HTML 已保存到: {html_file}") finally: try: run_cli(session, ["close"]) except Exception: pass if __name__ == "__main__": main()