| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- #!/usr/bin/env python3
- # Cookie 管理器 - 同步版本
- import random
- import time
- from loguru import logger
- from playwright.sync_api import sync_playwright
- from config import BROWSER_CONFIG
- from dy_cookie_manager import DouyinCookieManager
- class CookieGenerator:
- def __init__(self):
- self.user_agents = [
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.7339.81 Safari/537.36",
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.7339.81 Safari/537.36",
- ]
- # 为两个任务创建管理器实例
- self.search_manager = DouyinCookieManager("cookies:douyin:search")
- self.detail_manager = DouyinCookieManager("cookies:douyin:detail")
- def check_environment(self):
- """检查环境"""
- try:
- with sync_playwright() as p:
- browser = p.chromium.launch(headless=True, args=["--no-sandbox"])
- browser.close()
- logger.info("✅ 环境检查通过")
- return True
- except Exception as e:
- logger.error(f"❌ 环境检查失败: {e}")
- return False
- def generate_cookie(self):
- """生成一个 Cookie """
- with sync_playwright() as p:
- browser = None
- try:
- browser = p.chromium.launch(**BROWSER_CONFIG)
- context = browser.new_context(
- user_agent=random.choice(self.user_agents),
- viewport={"width": 1920, "height": 1080}
- )
- page = context.new_page()
- # 反检测
- page.add_init_script("""
- Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
- """)
- # 访问抖音
- page.goto("https://www.douyin.com", timeout=30000, wait_until="domcontentloaded")
- time.sleep(5)
- # 获取 Cookie
- cookies = context.cookies("https://www.douyin.com")
- cookie_str = self._cookies_to_string(cookies)
- if cookie_str and len(cookie_str) > 50:
- logger.info(f"✅ Cookie 生成成功\n{cookie_str}")
- return cookie_str
- else:
- logger.error("❌ Cookie 生成失败")
- return None
- except Exception as e:
- logger.error(f"❌ 生成过程出错: {e}")
- return None
- finally:
- if browser:
- browser.close()
- def save_cookie(self, cookie, cookie_key):
- """保存 Cookie - 使用 DouyinCookieManager"""
- try:
- # 使用 DouyinCookieManager 保存 Cookie
- self.get_cookie_manager(cookie_key).add_cookie(cookie)
- logger.info(f"💾 Cookie 保存到: {cookie_key}")
- return True
- except Exception as e:
- logger.error(f"❌ 保存失败 {cookie_key}: {e}")
- return False
- def genera_cookies(self, task_config):
- """根据任务配置补充 Cookie """
- cookie_key = task_config["cookie_key"]
- target_count = task_config["target_count"]
- batch_size = task_config["batch_size"]
- current_count = self.get_cookie_manager(cookie_key).get_cookie_count()
- if current_count >= target_count:
- logger.info(f"✅ {cookie_key} 数量充足: {current_count}/{target_count}")
- return 0, current_count
- need_count = target_count - current_count
- generate_count = min(need_count, batch_size)
- if generate_count <= 0:
- return 0, current_count
- logger.info(f"🔄 补充 {cookie_key}: {current_count} -> {target_count}, 本次生成 {generate_count} 个")
- success_count = 0
- for i in range(generate_count):
- cookie = self.generate_cookie()
- if cookie and self.save_cookie(cookie, cookie_key):
- success_count += 1
- logger.info(f"✅ {cookie_key} 第 {i + 1}/{generate_count} 个生成成功")
- else:
- logger.error(f"❌ {cookie_key} 第 {i + 1}/{generate_count} 个生成失败")
- # 短暂间隔
- if i < generate_count - 1:
- time.sleep(2)
- new_count = self.get_cookie_manager(cookie_key).get_cookie_count()
- logger.info(f"🎯 {cookie_key} 补充完成: {current_count} -> {new_count}, 成功 {success_count} 个")
- return success_count, new_count
- def _cookies_to_string(self, cookies):
- """Cookie 列表转字符串"""
- return '; '.join([f"{c['name']}={c['value']}" for c in cookies if c.get('name')])
- # 获取 Cookie 管理器实例
- def get_cookie_manager(self, cookie_key):
- """获取指定任务的 Cookie 管理器"""
- if cookie_key == "cookies:douyin:search":
- return self.search_manager
- elif cookie_key == "cookies:douyin:detail":
- return self.detail_manager
- else:
- return None
|