#!/usr/bin/env python3 # 双任务独立调度器 import time import schedule from datetime import datetime from loguru import logger from cookie_generator import CookieGenerator from config import TASKS_CONFIG class DualTaskScheduler: def __init__(self): self.manager = CookieGenerator() self.setup_logging() def setup_logging(self): """日志配置""" logger.add( "dual_task_scheduler.log", rotation="10 MB", level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" ) def setup_individual_schedules(self): """为每个任务设置独立的调度""" logger.info("⏰ 设置独立任务调度...") for task_name, task_config in TASKS_CONFIG.items(): if not task_config["enabled"]: logger.info(f"⏭️ 跳过未启用任务: {task_name}") continue interval = task_config["check_interval"] # 为每个任务设置独立的定时任务 schedule.every(interval).minutes.do( self.run_single_task, task_name=task_name, task_config=task_config ) logger.info(f"✅ 设置任务: {task_name}") logger.info(f" 📝 {task_config['description']}") logger.info(f" ⏰ 间隔: 每 {interval} 分钟") logger.info(f" 🎯 目标: {task_config['target_count']} 个") logger.info(f" 📦 批量: {task_config['batch_size']} 个/次") def run_single_task(self, task_name, task_config): """运行单个任务""" logger.info(f"🔍 开始检查任务: {task_name}") start_time = datetime.now() try: success_count, new_count = self.manager.genera_cookies(task_config) duration = (datetime.now() - start_time).total_seconds() target_count = task_config["target_count"] logger.info(f"✅ 任务完成: {task_name}") logger.info(f" 📊 结果: {success_count} 个生成成功") logger.info(f" 📦 当前: {new_count}/{target_count} 个") logger.info(f" ⏱️ 耗时: {duration:.1f} 秒") except Exception as e: logger.error(f"❌ 任务失败 {task_name}: {e}") def run_all_tasks_once(self): """立即执行所有任务一次""" logger.info("🚀 立即执行所有任务...") total_generated = 0 start_time = datetime.now() for task_name, task_config in TASKS_CONFIG.items(): if task_config["enabled"]: success_count, _ = self.manager.genera_cookies(task_config) total_generated += success_count duration = (datetime.now() - start_time).total_seconds() logger.info(f"🎊 所有任务完成! 共生成 {total_generated} 个Cookie, 耗时 {duration:.1f}秒") def run_single_task_once(self, task_name): """立即执行单个任务""" if task_name not in TASKS_CONFIG: logger.error(f"❌ 未知任务: {task_name}") return task_config = TASKS_CONFIG[task_name] if not task_config["enabled"]: logger.warning(f"⚠️ 任务未启用: {task_name}") logger.info(f"🚀 立即执行任务: {task_name}") self.run_single_task(task_name, task_config) def show_status(self): """显示所有任务状态""" status_report = f""" 任务现状: {'=' * 60}""" for task_name, task_config in TASKS_CONFIG.items(): if task_config["enabled"]: cookie_key = task_config["cookie_key"] current_count = self.manager.get_cookie_manager(cookie_key).get_cookie_count() target_count = task_config["target_count"] interval = task_config["check_interval"] status_icon = "✅" if current_count >= target_count else "🔄" status_report += f""" 📋 {task_name} ({task_config['description']}) {status_icon} COOKIE数量: {current_count}/{target_count} ⏰ 检查频率: 每 {interval} 分钟 📦 批量: {task_config['batch_size']} 个/次 🔑 存储: {cookie_key}""" status_report += f"\n{'=' * 60}" logger.info(status_report) def run(self): """运行调度器""" # 环境检查 if not self.manager.check_environment(): logger.error("❌ 环境检查失败,系统退出") return # 显示初始状态 self.show_status() # 设置独立调度 self.setup_individual_schedules() logger.info("🔄 系统运行中...") # 立即执行一次所有任务 self.run_all_tasks_once() try: while True: schedule.run_pending() time.sleep(60) # 每分钟检查一次 except KeyboardInterrupt: logger.info("⏹️ 系统停止")