| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import asyncio
- import json
- import os
- import sys
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import quote
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from agent.tools.builtin.browser.baseClass import (
- init_browser_session,
- navigate_to_url,
- wait,
- get_page_html,
- evaluate,
- scroll_page,
- cleanup_browser_session,
- )
- async def run_task():
- project_root = Path(__file__).resolve().parents[1]
- output_dir = project_root / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- json_file = output_dir / "baidu.json"
- html_file = output_dir / "baidu_page.html"
- try:
- await init_browser_session(headless=False, profile_name="baidu_profile")
- await navigate_to_url("https://www.baidu.com")
- await wait(seconds=2)
- keyword = "Python 教程"
- search_url = f"https://www.baidu.com/s?wd={quote(keyword)}"
- await navigate_to_url(search_url)
- await wait(seconds=3)
- await scroll_page(down=True, pages=1.0)
- await wait(seconds=2)
- extract_js = """
- (function(){
- try {
- const results = [];
- const resultItems = document.querySelectorAll('#content_left > div[class*="result"]');
- resultItems.forEach((item, index) => {
- if (index >= 10) return;
- try {
- const titleEl = item.querySelector('h3 a, .t a');
- const title = titleEl ? titleEl.textContent.trim() : '';
- const link = titleEl ? titleEl.href : '';
- const summaryEl = item.querySelector('.c-abstract, .content-right_8Zs40');
- const summary = summaryEl ? summaryEl.textContent.trim() : '';
- const sourceEl = item.querySelector('.c-color-gray, .source_1Vdff');
- const source = sourceEl ? sourceEl.textContent.trim() : '';
- if (title || link) {
- results.push({
- index: index + 1,
- title: title,
- link: link,
- summary: summary.substring(0, 200),
- source: source
- });
- }
- } catch (e) {
- }
- });
- return {
- success: true,
- count: results.length,
- keyword: 'Python 教程',
- timestamp: new Date().toISOString(),
- results: results
- };
- } catch (e) {
- return {
- success: false,
- error: e.message,
- stack: e.stack
- };
- }
- })()
- """
- result = await evaluate(code=extract_js)
- output = result.output
- if output.startswith("Result: "):
- output = output[8:]
- try:
- data = json.loads(output)
- except json.JSONDecodeError:
- data = {
- "success": False,
- "error": "JSON解析失败",
- "raw_output": output[:1000],
- "keyword": keyword,
- "timestamp": datetime.now().isoformat(),
- }
- with open(json_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- html_result = await get_page_html()
- html_content = html_result.metadata.get("html", "")
- page_url = html_result.metadata.get("url", "")
- page_title = html_result.metadata.get("title", "")
- meta_info = (
- "\n".join(
- [
- "<!--",
- f" 页面标题: {page_title}",
- f" 页面URL: {page_url}",
- f" 保存时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
- f" 搜索关键词: {keyword}",
- "-->",
- "",
- ]
- )
- + "\n"
- )
- with open(html_file, "w", encoding="utf-8") as f:
- f.write(meta_info)
- f.write(html_content)
- print(f"✅ 数据已保存到: {json_file}")
- print(f"✅ HTML 已保存到: {html_file}")
- finally:
- await cleanup_browser_session()
- def main():
- asyncio.run(run_task())
- if __name__ == "__main__":
- main()
|