| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import json
- import subprocess
- import time
- from pathlib import Path
- def run_cli(session: str, args: list[str]) -> dict:
- command = ["browser-use", "--session", session, "--json"] + args
- result = subprocess.run(command, capture_output=True, text=True)
- if result.returncode != 0:
- raise RuntimeError(result.stderr.strip() or "browser-use command failed")
- payload = result.stdout.strip()
- if not payload:
- raise RuntimeError("browser-use returned empty output")
- data = json.loads(payload)
- if not data.get("success", False):
- raise RuntimeError(data.get("error", "browser-use command error"))
- return data.get("data", {})
- def stop_session_server(session: str) -> None:
- subprocess.run(
- ["browser-use", "--session", session, "server", "stop"],
- capture_output=True,
- text=True,
- )
- def main():
- project_root = Path(__file__).resolve().parents[1]
- output_dir = project_root / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- json_file = output_dir / "skill_baidu.json"
- html_file = output_dir / "skill_baidu_page.html"
- session = "skill_baidu"
- keyword = "瑜伽美女"
- try:
- stop_session_server(session)
- try:
- run_cli(session, ["open", "https://www.baidu.com"])
- except RuntimeError:
- stop_session_server(session)
- run_cli(session, ["open", "https://www.baidu.com"])
- search_js = (
- "(function(){"
- "const input=document.querySelector('#kw');"
- "const btn=document.querySelector('#su');"
- "if(input){input.value='" + keyword + "';}"
- "if(btn){btn.click();}"
- "else if(input&&input.form){input.form.submit();}"
- "return {hasInput:!!input,hasButton:!!btn};"
- "})()"
- )
- run_cli(session, ["eval", search_js])
- wait_js = (
- "(function(){"
- "const items=document.querySelectorAll('#content_left .result, #content_left .c-container, #content_left .result-op');"
- "const bodyReady=!!document.body;"
- "const bodyLen=bodyReady?(document.body.innerText||'').length:0;"
- "return {count:items.length, bodyReady:bodyReady, bodyLen:bodyLen};"
- "})()"
- )
- count = 0
- for _ in range(12):
- data = run_cli(session, ["eval", wait_js])
- result = data.get("result") if isinstance(data, dict) else {}
- count = int(result.get("count") or 0)
- body_len = int(result.get("bodyLen") or 0)
- if count >= 3 or body_len > 1000:
- break
- time.sleep(1)
- extract_js = (
- "(function(){"
- "const items=Array.from(document.querySelectorAll('#content_left .result, #content_left .c-container, #content_left .result-op'));"
- "const results=[];"
- "for(const item of items){"
- "const a=item.querySelector('h3 a')||item.querySelector('a[data-click]')||item.querySelector('a');"
- "if(!a) continue;"
- "const title=(a.textContent||'').trim();"
- "const link=a.href||'';"
- "const summaryEl=item.querySelector('.c-abstract, .content-right_8Zs40, .content-right_8Zs40_2gVt2');"
- "const summary=(summaryEl?summaryEl.textContent:'').trim();"
- "results.push({index:results.length+1,title,link,summary});"
- "if(results.length>=10) break;"
- "}"
- "return {success:true,keyword:'" + keyword + "',count:results.length,timestamp:new Date().toISOString(),results:results};"
- "})()"
- )
- data = run_cli(session, ["eval", extract_js])
- extracted = data.get("result") if isinstance(data, dict) else data
- if not extracted:
- extracted = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
- "results": [],
- }
- with open(json_file, "w", encoding="utf-8") as f:
- json.dump(extracted, f, ensure_ascii=False, indent=2)
- html_data = run_cli(session, ["eval", "document.documentElement.outerHTML"])
- html_content = html_data.get("result") if isinstance(html_data, dict) else html_data
- with open(html_file, "w", encoding="utf-8") as f:
- f.write(html_content or "")
- print(f"✅ 数据已保存到: {json_file}")
- print(f"✅ HTML 已保存到: {html_file}")
- finally:
- try:
- run_cli(session, ["close"])
- except Exception:
- pass
- if __name__ == "__main__":
- main()
|