""" 百度搜索示例 - 使用 Playwright 直接实现 Baidu Search Example - Direct Playwright Implementation 功能: 1. 打开百度 2. 搜索"Python 教程" 3. 提取搜索结果数据并保存到 baidu.json 4. 保存完整页面 HTML 到 baidu_page.html 使用方法: python example_playwright.py """ import asyncio import json from pathlib import Path from datetime import datetime from playwright.async_api import async_playwright async def baidu_search_task(): """ 百度搜索任务:搜索"Python 教程"并保存数据 """ print("\n" + "="*80) print("🚀 开始执行百度搜索任务 (Playwright 版本)") print("="*80 + "\n") # 项目根目录 project_root = Path(__file__).parent # 用户数据目录(用于保存登录状态) user_data_dir = Path.home() / ".playwright_profiles" / "baidu_profile" user_data_dir.mkdir(parents=True, exist_ok=True) async with async_playwright() as p: try: # ============================================================ # 步骤 1: 启动浏览器(使用持久化上下文) # ============================================================ print("📌 步骤 1: 启动浏览器...") # 使用 launch_persistent_context 保持登录状态 context = await p.chromium.launch_persistent_context( user_data_dir=str(user_data_dir), headless=False, viewport={"width": 1280, "height": 720} ) # 获取或创建页面 if context.pages: page = context.pages[0] else: page = await context.new_page() print("✅ 浏览器已启动\n") # ============================================================ # 步骤 2: 导航到百度首页 # ============================================================ print("📌 步骤 2: 导航到百度...") await page.goto("https://www.baidu.com") await page.wait_for_load_state("networkidle") print("✅ 已打开百度首页\n") # 等待页面加载 await asyncio.sleep(2) # ============================================================ # 步骤 3: 搜索"Python 教程" # ============================================================ print("📌 步骤 3: 搜索关键词...") # 方式1: 直接导航到搜索结果页面(推荐) search_keyword = "Python 教程" search_url = f"https://www.baidu.com/s?wd={search_keyword}" print(f"🔍 搜索关键词: {search_keyword}") await page.goto(search_url) await page.wait_for_load_state("networkidle") print("✅ 已导航到搜索结果页面\n") # 等待搜索结果加载 print("⏳ 等待搜索结果加载...") await asyncio.sleep(3) # 滚动页面加载更多内容 print("📜 滚动页面加载更多内容...") await page.mouse.wheel(0, 800) # 向下滚动 await asyncio.sleep(2) print("✅ 搜索结果已加载\n") # ============================================================ # 步骤 4: 提取搜索结果数据 # ============================================================ print("📌 步骤 4: 提取搜索结果数据...") # 使用 JavaScript 提取数据 extract_js = """ (function(){ try { // 提取搜索结果 const results = []; // 百度的搜索结果选择器 const resultItems = document.querySelectorAll('#content_left > div[class*="result"]'); console.log('找到搜索结果数量:', resultItems.length); resultItems.forEach((item, index) => { if (index >= 10) return; // 只提取前10个 try { // 提取标题和链接 const titleEl = item.querySelector('h3 a, .t a'); const title = titleEl ? titleEl.textContent.trim() : ''; const link = titleEl ? titleEl.href : ''; // 提取摘要 const summaryEl = item.querySelector('.c-abstract, .content-right_8Zs40'); const summary = summaryEl ? summaryEl.textContent.trim() : ''; // 提取来源 const sourceEl = item.querySelector('.c-color-gray, .source_1Vdff'); const source = sourceEl ? sourceEl.textContent.trim() : ''; if (title || link) { results.push({ index: index + 1, title: title, link: link, summary: summary.substring(0, 200), // 限制摘要长度 source: source }); } } catch (e) { console.error('提取单个结果失败:', e); } }); return { success: true, count: results.length, keyword: 'Python 教程', timestamp: new Date().toISOString(), results: results }; } catch (e) { return { success: false, error: e.message, stack: e.stack }; } })() """ data = await page.evaluate(extract_js) if data.get('success'): print(f"✅ 成功提取 {data.get('count', 0)} 条搜索结果") # 保存到 baidu.json json_file = project_root / "baidu.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"✅ 数据已保存到: {json_file}\n") # 打印前3条结果预览 if data.get('results'): print("📋 前3条结果预览:") for item in data['results'][:3]: print(f" {item.get('index')}. {item.get('title', '无标题')}") print(f" 链接: {item.get('link', '')[:60]}...") print(f" 来源: {item.get('source', '未知')}") print() else: print(f"⚠️ 数据提取失败: {data.get('error', '未知错误')}") # 保存错误信息 error_data = { "success": False, "error": data.get('error'), "keyword": "Python 教程", "timestamp": datetime.now().isoformat() } json_file = project_root / "baidu.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(error_data, f, ensure_ascii=False, indent=2) print(f"⚠️ 错误信息已保存到: {json_file}\n") # ============================================================ # 步骤 5: 保存完整页面 HTML # ============================================================ print("📌 步骤 5: 保存完整页面 HTML...") html_content = await page.content() page_url = page.url page_title = await page.title() # 保存 HTML 文件 html_file = project_root / "baidu_page.html" with open(html_file, 'w', encoding='utf-8') as f: # 添加一些元信息 meta_info = f""" """ f.write(meta_info) f.write(html_content) print(f"✅ HTML 已保存到: {html_file}") print(f" 页面标题: {page_title}") print(f" 页面URL: {page_url}") print(f" HTML 大小: {len(html_content):,} 字符\n") # ============================================================ # 任务完成 # ============================================================ print("="*80) print("🎉 任务完成!") print("="*80) print(f"📁 生成的文件:") print(f" 1. baidu.json - 搜索结果数据") print(f" 2. baidu_page.html - 完整页面HTML") print("="*80 + "\n") # 等待一下让用户看到结果 print("⏳ 浏览器将在 5 秒后关闭...") await asyncio.sleep(5) except Exception as e: print(f"\n❌ 任务执行失败: {str(e)}") import traceback traceback.print_exc() finally: # 关闭浏览器 await context.close() print("✅ 浏览器已关闭\n") async def main(): """主函数""" await baidu_search_task() if __name__ == "__main__": # 运行任务 asyncio.run(main())