| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 | import osimport sysimport threadingimport tracebackfrom datetime import datetime, timedeltaimport redisfrom application.common import Localsys.path.append(os.getcwd())from application.common.mysql import MysqlHelperclass DatabaseOperations:    def __init__(self, mode, platform):        self.mysql = MysqlHelper(mode=mode, platform=platform)        self.LocalLog = Local.logger(platform, mode)        self.mode = mode        self.platform = platform    def check_user_id(self, uid):        """        检查指定用户ID是否存在于数据库的zqkd_uid表中。        :param uid:要检查的用户ID        :return:如果用户ID存在于表中返回True,否则返回False        """        try:            query_sql = f""" SELECT uid FROM zqkd_user WHERE uid = "{uid}"; """            result = self.mysql.select(sql=query_sql)            return bool(result)        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")            return False    def update_user(self, uid, user_name, avatar_url):        """        更新数据库中指定用户的用户名和头像URL。        :param uid:要更新信息的用户ID        :param user_name:新的用户名        :param avatar_url:新的头像URL        :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常        """        try:            update_sql = f""" UPDATE zqkd_user SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """            return self.mysql.update(sql=update_sql)        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"更新用户信息失败: {e}\n{tb}")            return None    def insert_user(self, uid, user_name, avatar_url):        """        向数据库的zqkd_user表中插入或更新用户信息        :param uid: 用户ID(数值类型)        :param user_name: 用户名        :param avatar_url: 头像URL        :return: 成功返回影响的行数,失败返回None        """        try:            # 直接拼接SQL(不推荐,有SQL注入风险)            insert_sql = f"""                INSERT INTO zqkd_user (uid, avatar_url, user_name)                 VALUES ({uid}, '{avatar_url.replace("'", "''")}', '{user_name.replace("'", "''")}')                 ON DUPLICATE KEY UPDATE                 user_name = '{user_name.replace("'", "''")}',                 avatar_url = '{avatar_url.replace("'", "''")}'            """            return self.mysql.update(sql=insert_sql)        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"插入用户信息失败: {e}\n{tb}")            return None    def get_today_videos(self):        try:            # 手动转义单引号(仅缓解部分风险)            sql = """                        SELECT count(*) as cnt                        FROM crawler_video                         WHERE create_time >= CURDATE()                           AND create_time < CURDATE() + INTERVAL 1 DAY                           AND platform = %s                           AND strategy = %s                    """            result = self.mysql.select_params(sql, (self.platform,self.mode))            if result and len(result) > 0:                return result[0][0]  # 返回第一行第一列的计数值            return 0  # 无结果时返回0        except Exception as e:            self.LocalLog.error(f"查询失败: {e}")            return 0    def select_user(self, last_scanned_id=0):        """        根据last_scanned_id查询用户数据        :param last_scanned_id: 上次扫描的ID,0表示从头开始        :return: 查询结果列表        """        try:            # 构建查询(根据last_scanned_id过滤)            query = "SELECT id, uid FROM zqkd_user"            if last_scanned_id > 0:                query += f" WHERE id > {last_scanned_id}"            query += " ORDER BY id ASC"            return self.mysql.select(query)        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"查询用户列表失败: {e}\n{tb}")            return []class RedisOperations:    _pool: redis.ConnectionPool = None    _instance = None    _lock = threading.Lock()  # 用于线程安全的单例创建    @classmethod    def get_instance(cls, mode="", platform=""):        """线程安全的单例获取方法"""        if not cls._instance:            with cls._lock:                if not cls._instance:                    cls._instance = cls(mode, platform)        return cls._instance    def __init__(self, mode, platform):        # 私有构造函数,使用 get_instance() 获取实例        self.mode = mode        self.platform = platform        self.LocalLog = Local.logger(self.platform, self.mode)        if RedisOperations._instance is not None:            raise Exception("请使用 get_instance() 获取实例")        self._pool = self._get_pool()        self.client = redis.Redis(connection_pool=self._pool, decode_responses=True)  # 复用同一个客户端    def _get_pool(self) -> redis.ConnectionPool:        if self._pool is None:            try:                self._pool = redis.ConnectionPool(                    host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",                    port=6379,                    db=0,                    password="Wqsd@2019",                    max_connections=50,  # 增加最大连接数                    socket_timeout=10,                    retry_on_timeout=True                )            except Exception as e:                tb = traceback.format_exc()                self.LocalLog.error(f"创建Redis连接池失败: {e}\n{tb}")                raise        return self._pool    def close(self):        """关闭连接池"""        try:            if self._pool:                self._pool.disconnect(inuse_connections=True)        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"关闭Redis连接池失败: {e}\n{tb}")    def get_recommend_video(self, task="task:zqkd_video_id"):        """从Redis的指定列表中弹出并返回最左边的视频ID"""        try:            value_bytes = self.client.rpop(task)            value_str = value_bytes.decode('utf-8')            return value_str        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"获取推荐视频ID失败: {e}\n{tb}")            return None    def check_video_id_exists(self, videoID):        """检查指定的视频ID是否已经存在于Redis中"""        try:            key = f"crawler:zqkd:{videoID}"            return self.client.exists(key)        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"检查视频ID是否存在失败: {e}\n{tb}")            return False    def save_video_id(self, videoID):        """将视频ID存储到Redis中,并为其设置3天的过期时间"""        try:            key = f"crawler:zqkd:{videoID}"            expiration_time = int(timedelta(days=3).total_seconds())            self.client.setex(key, expiration_time, "1")        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"保存视频ID失败: {e}\n{tb}")    def save_recommend_video(self, videoID):        """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""        try:            task = "task:zqkd_video_id"            pipe = self.client.pipeline()  # 使用管道执行多个命令            pipe.rpush(task, videoID)            pipe.expire(task, int(timedelta(days=2).total_seconds()))            pipe.execute()            # 检查数据是否写入成功            list_length = self.client.llen(task)            self.LocalLog.info(f"保存推荐视频ID成功,列表长度: {list_length}")        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"保存推荐视频ID失败: {e}\n{tb}")    def get_last_scanned_id(self):        """获取上次扫描的ID"""        try:            return self.client.get("zqkd_last_scanned_id").decode('utf-8')        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"获取上次扫描的ID失败: {e}\n{tb}")            return None    def set_last_scanned_id(self, last_scanned_id):        """设置上次扫描的ID"""        try:            result = self.client.set("zqkd_last_scanned_id", last_scanned_id)            if result:                self.LocalLog.info(f"成功设置上次扫描的ID: {last_scanned_id}")        except Exception as e:            tb = traceback.format_exc()            self.LocalLog.error(f"设置上次扫描的ID失败: {e}\n{tb}")            return Falseif __name__ == '__main__':    db = DatabaseOperations("author", "zhongqingkandianauthor")    print(db.get_today_videos())
 |