| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import asyncio
- import time
- import json
- import hashlib
- from typing import Optional
- from loguru import logger
- import redis
- from config import REDIS_CONFIG
- class DouyinCookieManager:
- def __init__(self, cookie_key: str):
- """
- Douyin Cookie 管理器
- :param cookie_key: Redis 键名,如 "cookies:douyin:detail"
- """
- self.redis = redis.Redis(**REDIS_CONFIG)
- self.key_list = cookie_key
- self.key_info = f"{cookie_key}:info"
- # Cookie 管理操作
- def add_cookie(self, cookie: str):
- """手动新增一个 Cookie"""
- try:
- if isinstance(cookie, bytes):
- cookie = cookie.decode("utf-8")
- cookie_id = self._generate_cookie_id(cookie)
- # 检查是否已存在
- existing_info = self.redis.hget(self.key_info, cookie_id)
- if existing_info:
- logger.info(f"[⚠️] Cookie 已存在,跳过: {cookie_key}")
- return
- # 添加到列表和哈希表
- self.redis.lpush(self.key_list, cookie)
- info = {
- "cookie": cookie,
- "use_count": 0,
- "status": "ok",
- "fail": 0,
- "first_use": 0,
- "last_use": 0,
- "last_fail_time": 0,
- "added_time": int(time.time())
- }
- self.redis.hset(self.key_info, cookie_id, json.dumps(info))
- logger.info(f"[✅] 新增 Cookie 成功,ID: {cookie_id}")
- except Exception as e:
- logger.error(f"添加Cookie异常: {e}")
- def get_cookie(self) -> Optional[str]:
- """获取一个 Cookie 并更新使用信息"""
- try:
- # 从列表尾部取出并放回头部,实现轮询
- cookie = self.redis.rpoplpush(self.key_list, self.key_list)
- if not cookie:
- logger.info("[❗] 当前无可用 Cookie")
- return None
- # 解码为字符串
- cookie_str = cookie.decode() if isinstance(cookie, bytes) else cookie
- cookie_id = self._generate_cookie_id(cookie_str)
- now = int(time.time())
- # 获取 info
- info_raw = self.redis.hget(self.key_info, cookie_id)
- if not info_raw:
- # 若 info 不存在,自动补建
- info_data = {
- "cookie": cookie_str,
- "use_count": 1,
- "status": "ok",
- "fail": 0,
- "first_use": now,
- "last_use": now,
- "last_fail_time": 0,
- "added_time": now
- }
- else:
- info_data = json.loads(info_raw.decode() if isinstance(info_raw, bytes) else info_raw)
- info_data["use_count"] = info_data.get("use_count", 0) + 1
- info_data["last_use"] = now
- if not info_data.get("first_use"):
- info_data["first_use"] = now
- self.redis.hset(self.key_info, cookie_id, json.dumps(info_data))
- logger.info(f"[✅] 获取 Cookie 成功,ID: {cookie_id}")
- return cookie_str
- except Exception as e:
- logger.error(f"获取Cookie异常: {e}")
- return None
- def mark_fail(self, cookie: str):
- """标记 Cookie 使用失败"""
- self._update_info(cookie, success=False)
- def _update_info(self, cookie: str, success: bool):
- """更新 Cookie 使用状态"""
- try:
- if isinstance(cookie, bytes):
- cookie = cookie.decode("utf-8")
- cookie_id = self._generate_cookie_id(cookie)
- info_raw = self.redis.hget(self.key_info, cookie_id)
- if not info_raw:
- logger.warning(f"[⚠️] Cookie 信息不存在: {cookie_id}")
- return
- info_data = json.loads(info_raw.decode() if isinstance(info_raw, bytes) else info_raw)
- now = int(time.time())
- if not success:
- info_data["fail"] = info_data.get("fail", 0) + 1
- if not info_data.get("first_fail_time"):
- info_data["first_fail_time"] = now
- info_data["last_fail_time"] = now
- if info_data["fail"] >= 10:
- info_data["status"] = "banned"
- self.redis.lrem(self.key_list, 0, cookie)
- info_data["last_use"] = now
- self.redis.hset(self.key_info, cookie_id, json.dumps(info_data))
- except Exception as e:
- logger.error(f"更新Cookie信息异常: {e}")
- def delete_cookie(self, cookie: str):
- """删除 Cookie(从列表与 info 中同时移除)"""
- try:
- if isinstance(cookie, bytes):
- cookie = cookie.decode("utf-8")
- cookie_id = self._generate_cookie_id(cookie)
- # 删除 info
- self.redis.hdel(self.key_info, cookie_id)
- # 删除列表中对应 cookie(兼容 bytes)
- raw_list = self.redis.lrange(self.key_list, 0, -1)
- for item in raw_list:
- item_str = item.decode("utf-8") if isinstance(item, bytes) else item
- if item_str == cookie:
- self.redis.lrem(self.key_list, 0, item)
- logger.info(f"[✅] 删除 Cookie 成功,ID: {cookie_id}")
- return
- logger.warning(f"[⚠️] 未找到列表中对应 Cookie: {cookie_id}")
- except Exception as e:
- logger.error(f"删除Cookie异常: {e}")
- def list_cookies(self):
- """列出所有 Cookie 信息"""
- try:
- data = self.redis.hgetall(self.key_info)
- result = {}
- for k, v in data.items():
- try:
- key = k.decode() if isinstance(k, bytes) else k
- value = v.decode() if isinstance(v, bytes) else v
- result[key] = json.loads(value)
- except Exception:
- continue
- # 格式化时间戳为可读格式
- for cookie_info in result.values():
- if "first_use" in cookie_info and cookie_info["first_use"] > 0:
- import datetime
- cookie_info["first_use_formatted"] = datetime.datetime.fromtimestamp(
- cookie_info["first_use"]).strftime('%Y-%m-%d %H:%M:%S')
- else:
- cookie_info["first_use_formatted"] = "从未使用"
- if "last_use" in cookie_info and cookie_info["last_use"] > 0:
- import datetime
- cookie_info["last_use_formatted"] = datetime.datetime.fromtimestamp(
- cookie_info["last_use"]).strftime('%Y-%m-%d %H:%M:%S')
- else:
- cookie_info["last_use_formatted"] = "从未使用"
- if "last_fail_time" in cookie_info and cookie_info["last_fail_time"] > 0:
- import datetime
- cookie_info["last_fail_time_formatted"] = datetime.datetime.fromtimestamp(
- cookie_info["last_fail_time"]).strftime('%Y-%m-%d %H:%M:%S')
- else:
- cookie_info["last_fail_time_formatted"] = "从未失败"
- if "added_time" in cookie_info and cookie_info["added_time"] > 0:
- import datetime
- cookie_info["added_time_formatted"] = datetime.datetime.fromtimestamp(
- cookie_info["added_time"]).strftime('%Y-%m-%d %H:%M:%S')
- else:
- cookie_info["added_time_formatted"] = "未知时间"
- return result
- except Exception as e:
- logger.error(f"列出Cookie异常: {e}")
- return {}
- def get_cookie_count(self):
- """获取 Cookie 数量"""
- try:
- return self.redis.llen(self.key_list)
- except Exception as e:
- logger.error(f"获取数量失败 {self.key_list}: {e}")
- return 0
- def _generate_cookie_id(self, cookie: str) -> str:
- """使用 MD5 生成固定长度的 cookie_id"""
- return hashlib.md5(cookie.encode("utf-8")).hexdigest()
- # 测试函数
- def main():
- # 创建两个管理器实例,对应两个任务
- search_mgr = DouyinCookieManager("cookies:douyin:search")
- detail_mgr = DouyinCookieManager("cookies:douyin:detail")
- print("🎯 Douyin Cookie 管理器测试")
- print("=" * 60)
- # 显示当前状态
- search_count = search_mgr.get_cookie_count()
- detail_count = detail_mgr.get_cookie_count()
- print(f"搜索任务 Cookie 数量: {search_count}")
- print(f"浏览任务 Cookie 数量: {detail_count}")
- # 列出所有 Cookie 信息(如果有的话)
- if search_count > 0:
- print("\n搜索任务 Cookie 详情:")
- search_cookies = search_mgr.list_cookies()
- for cookie_id, info in list(search_cookies.items())[:3]: # 只显示前3个
- print(f" - {cookie_id[:8]}...: 使用次数={info.get('use_count', 0)}, 状态={info.get('status', 'unknown')}")
- if detail_count > 0:
- print("\n浏览任务 Cookie 详情:")
- detail_cookies = detail_mgr.list_cookies()
- for cookie_id, info in list(detail_cookies.items())[:3]: # 只显示前3个
- print(f" - {cookie_id[:8]}...: 使用次数={info.get('use_count', 0)}, 状态={info.get('status', 'unknown')}")
- if __name__ == "__main__":
- main()
|