cookie_generator.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. #!/usr/bin/env python3
  2. # Cookie 管理器 - 同步版本
  3. import random
  4. import time
  5. from loguru import logger
  6. from playwright.sync_api import sync_playwright
  7. from config import BROWSER_CONFIG
  8. from dy_cookie_manager import DouyinCookieManager
  9. class CookieGenerator:
  10. def __init__(self):
  11. self.user_agents = [
  12. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.7339.81 Safari/537.36",
  13. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.7339.81 Safari/537.36",
  14. ]
  15. # 为两个任务创建管理器实例
  16. self.search_manager = DouyinCookieManager("cookies:douyin:search")
  17. self.detail_manager = DouyinCookieManager("cookies:douyin:detail")
  18. def check_environment(self):
  19. """检查环境"""
  20. try:
  21. with sync_playwright() as p:
  22. browser = p.chromium.launch(**BROWSER_CONFIG)
  23. browser.close()
  24. logger.info("✅ 环境检查通过")
  25. return True
  26. except Exception as e:
  27. logger.error(f"❌ 环境检查失败: {e}")
  28. return False
  29. def generate_cookie(self):
  30. """生成一个 Cookie """
  31. with sync_playwright() as p:
  32. browser = None
  33. try:
  34. browser = p.chromium.launch(**BROWSER_CONFIG)
  35. context = browser.new_context(
  36. user_agent=random.choice(self.user_agents),
  37. viewport={"width": 1920, "height": 1080}
  38. )
  39. page = context.new_page()
  40. # 反检测
  41. page.add_init_script("""
  42. Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
  43. """)
  44. # 访问抖音
  45. page.goto("https://www.douyin.com", timeout=30000, wait_until="domcontentloaded")
  46. time.sleep(5)
  47. # 获取 Cookie
  48. cookies = context.cookies("https://www.douyin.com")
  49. cookie_str = self._cookies_to_string(cookies)
  50. if cookie_str and len(cookie_str) > 50:
  51. logger.info(f"✅ Cookie 生成成功\n{cookie_str}")
  52. return cookie_str
  53. else:
  54. logger.error("❌ Cookie 生成失败")
  55. return None
  56. except Exception as e:
  57. logger.error(f"❌ 生成过程出错: {e}")
  58. return None
  59. finally:
  60. if browser:
  61. browser.close()
  62. def save_cookie(self, cookie, cookie_key):
  63. """保存 Cookie - 使用 DouyinCookieManager"""
  64. try:
  65. # 使用 DouyinCookieManager 保存 Cookie
  66. self.get_cookie_manager(cookie_key).add_cookie(cookie)
  67. logger.info(f"💾 Cookie 保存到: {cookie_key}")
  68. return True
  69. except Exception as e:
  70. logger.error(f"❌ 保存失败 {cookie_key}: {e}")
  71. return False
  72. def genera_cookies(self, task_config):
  73. """根据任务配置补充 Cookie """
  74. cookie_key = task_config["cookie_key"]
  75. target_count = task_config["target_count"]
  76. batch_size = task_config["batch_size"]
  77. current_count = self.get_cookie_manager(cookie_key).get_cookie_count()
  78. if current_count >= target_count:
  79. logger.info(f"✅ {cookie_key} 数量充足: {current_count}/{target_count},生成 1 个Cookie")
  80. need_count = target_count - current_count
  81. generate_count = min(need_count, batch_size)
  82. # if generate_count <= 0:
  83. # generate_count = 1
  84. logger.info(f"🔄 补充 {cookie_key}: {current_count} -> {target_count}, 本次生成 {generate_count} 个")
  85. success_count = 0
  86. for i in range(generate_count):
  87. cookie = self.generate_cookie()
  88. if cookie and self.save_cookie(cookie, cookie_key):
  89. success_count += 1
  90. logger.info(f"✅ {cookie_key} 第 {i + 1}/{generate_count} 个生成成功")
  91. else:
  92. logger.error(f"❌ {cookie_key} 第 {i + 1}/{generate_count} 个生成失败")
  93. # 短暂间隔
  94. if i < generate_count - 1:
  95. time.sleep(2)
  96. new_count = self.get_cookie_manager(cookie_key).get_cookie_count()
  97. logger.info(f"🎯 {cookie_key} 补充完成: {current_count} -> {new_count}, 成功 {success_count} 个")
  98. return success_count, new_count
  99. def _cookies_to_string(self, cookies):
  100. """Cookie 列表转字符串"""
  101. return '; '.join([f"{c['name']}={c['value']}" for c in cookies if c.get('name')])
  102. # 获取 Cookie 管理器实例
  103. def get_cookie_manager(self, cookie_key):
  104. """获取指定任务的 Cookie 管理器"""
  105. if cookie_key == "cookies:douyin:search":
  106. return self.search_manager
  107. elif cookie_key == "cookies:douyin:detail":
  108. return self.detail_manager
  109. else:
  110. return None