import asyncio import time import json import hashlib from typing import Optional from loguru import logger import redis from config import REDIS_CONFIG class DouyinCookieManager: def __init__(self, cookie_key: str): """ Douyin Cookie 管理器 :param cookie_key: Redis 键名,如 "cookies:douyin:detail" """ self.redis = redis.Redis(**REDIS_CONFIG) self.key_list = cookie_key self.key_info = f"{cookie_key}:info" # Cookie 管理操作 def add_cookie(self, cookie: str): """手动新增一个 Cookie""" try: if isinstance(cookie, bytes): cookie = cookie.decode("utf-8") cookie_id = self._generate_cookie_id(cookie) # 检查是否已存在 existing_info = self.redis.hget(self.key_info, cookie_id) if existing_info: logger.info(f"[⚠️] Cookie 已存在,跳过: {cookie_key}") return # 添加到列表和哈希表 self.redis.lpush(self.key_list, cookie) info = { "cookie": cookie, "use_count": 0, "status": "ok", "fail": 0, "first_use": 0, "last_use": 0, "last_fail_time": 0, "added_time": int(time.time()) } self.redis.hset(self.key_info, cookie_id, json.dumps(info)) logger.info(f"[✅] 新增 Cookie 成功,ID: {cookie_id}") except Exception as e: logger.error(f"添加Cookie异常: {e}") def get_cookie(self) -> Optional[str]: """获取一个 Cookie 并更新使用信息""" try: # 从列表尾部取出并放回头部,实现轮询 cookie = self.redis.rpoplpush(self.key_list, self.key_list) if not cookie: logger.info("[❗] 当前无可用 Cookie") return None # 解码为字符串 cookie_str = cookie.decode() if isinstance(cookie, bytes) else cookie cookie_id = self._generate_cookie_id(cookie_str) now = int(time.time()) # 获取 info info_raw = self.redis.hget(self.key_info, cookie_id) if not info_raw: # 若 info 不存在,自动补建 info_data = { "cookie": cookie_str, "use_count": 1, "status": "ok", "fail": 0, "first_use": now, "last_use": now, "last_fail_time": 0, "added_time": now } else: info_data = json.loads(info_raw.decode() if isinstance(info_raw, bytes) else info_raw) info_data["use_count"] = info_data.get("use_count", 0) + 1 info_data["last_use"] = now if not info_data.get("first_use"): info_data["first_use"] = now self.redis.hset(self.key_info, cookie_id, json.dumps(info_data)) logger.info(f"[✅] 获取 Cookie 成功,ID: {cookie_id}") return cookie_str except Exception as e: logger.error(f"获取Cookie异常: {e}") return None def mark_fail(self, cookie: str): """标记 Cookie 使用失败""" self._update_info(cookie, success=False) def _update_info(self, cookie: str, success: bool): """更新 Cookie 使用状态""" try: if isinstance(cookie, bytes): cookie = cookie.decode("utf-8") cookie_id = self._generate_cookie_id(cookie) info_raw = self.redis.hget(self.key_info, cookie_id) if not info_raw: logger.warning(f"[⚠️] Cookie 信息不存在: {cookie_id}") return info_data = json.loads(info_raw.decode() if isinstance(info_raw, bytes) else info_raw) now = int(time.time()) if not success: info_data["fail"] = info_data.get("fail", 0) + 1 if not info_data.get("first_fail_time"): info_data["first_fail_time"] = now info_data["last_fail_time"] = now if info_data["fail"] >= 10: info_data["status"] = "banned" self.redis.lrem(self.key_list, 0, cookie) info_data["last_use"] = now self.redis.hset(self.key_info, cookie_id, json.dumps(info_data)) except Exception as e: logger.error(f"更新Cookie信息异常: {e}") def delete_cookie(self, cookie: str): """删除 Cookie(从列表与 info 中同时移除)""" try: if isinstance(cookie, bytes): cookie = cookie.decode("utf-8") cookie_id = self._generate_cookie_id(cookie) # 删除 info self.redis.hdel(self.key_info, cookie_id) # 删除列表中对应 cookie(兼容 bytes) raw_list = self.redis.lrange(self.key_list, 0, -1) for item in raw_list: item_str = item.decode("utf-8") if isinstance(item, bytes) else item if item_str == cookie: self.redis.lrem(self.key_list, 0, item) logger.info(f"[✅] 删除 Cookie 成功,ID: {cookie_id}") return logger.warning(f"[⚠️] 未找到列表中对应 Cookie: {cookie_id}") except Exception as e: logger.error(f"删除Cookie异常: {e}") def list_cookies(self): """列出所有 Cookie 信息""" try: data = self.redis.hgetall(self.key_info) result = {} for k, v in data.items(): try: key = k.decode() if isinstance(k, bytes) else k value = v.decode() if isinstance(v, bytes) else v result[key] = json.loads(value) except Exception: continue # 格式化时间戳为可读格式 for cookie_info in result.values(): if "first_use" in cookie_info and cookie_info["first_use"] > 0: import datetime cookie_info["first_use_formatted"] = datetime.datetime.fromtimestamp( cookie_info["first_use"]).strftime('%Y-%m-%d %H:%M:%S') else: cookie_info["first_use_formatted"] = "从未使用" if "last_use" in cookie_info and cookie_info["last_use"] > 0: import datetime cookie_info["last_use_formatted"] = datetime.datetime.fromtimestamp( cookie_info["last_use"]).strftime('%Y-%m-%d %H:%M:%S') else: cookie_info["last_use_formatted"] = "从未使用" if "last_fail_time" in cookie_info and cookie_info["last_fail_time"] > 0: import datetime cookie_info["last_fail_time_formatted"] = datetime.datetime.fromtimestamp( cookie_info["last_fail_time"]).strftime('%Y-%m-%d %H:%M:%S') else: cookie_info["last_fail_time_formatted"] = "从未失败" if "added_time" in cookie_info and cookie_info["added_time"] > 0: import datetime cookie_info["added_time_formatted"] = datetime.datetime.fromtimestamp( cookie_info["added_time"]).strftime('%Y-%m-%d %H:%M:%S') else: cookie_info["added_time_formatted"] = "未知时间" return result except Exception as e: logger.error(f"列出Cookie异常: {e}") return {} def get_cookie_count(self): """获取 Cookie 数量""" try: return self.redis.llen(self.key_list) except Exception as e: logger.error(f"获取数量失败 {self.key_list}: {e}") return 0 def _generate_cookie_id(self, cookie: str) -> str: """使用 MD5 生成固定长度的 cookie_id""" return hashlib.md5(cookie.encode("utf-8")).hexdigest() # 测试函数 def main(): # 创建两个管理器实例,对应两个任务 search_mgr = DouyinCookieManager("cookies:douyin:search") detail_mgr = DouyinCookieManager("cookies:douyin:detail") print("🎯 Douyin Cookie 管理器测试") print("=" * 60) # 显示当前状态 search_count = search_mgr.get_cookie_count() detail_count = detail_mgr.get_cookie_count() print(f"搜索任务 Cookie 数量: {search_count}") print(f"浏览任务 Cookie 数量: {detail_count}") # 列出所有 Cookie 信息(如果有的话) if search_count > 0: print("\n搜索任务 Cookie 详情:") search_cookies = search_mgr.list_cookies() for cookie_id, info in list(search_cookies.items())[:3]: # 只显示前3个 print(f" - {cookie_id[:8]}...: 使用次数={info.get('use_count', 0)}, 状态={info.get('status', 'unknown')}") if detail_count > 0: print("\n浏览任务 Cookie 详情:") detail_cookies = detail_mgr.list_cookies() for cookie_id, info in list(detail_cookies.items())[:3]: # 只显示前3个 print(f" - {cookie_id[:8]}...: 使用次数={info.get('use_count', 0)}, 状态={info.get('status', 'unknown')}") if __name__ == "__main__": main()