# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/2 """ 数据库连接及操作 """ import redis import pymysql from common.common import Common # from common import Common class MysqlHelper: @classmethod def connect_mysql(cls, env, machine): if machine == "aliyun_hk": # 创建一个 Connection 对象,代表了一个数据库连接 connection = pymysql.connect( host="rm-j6cz4c6pt96000xi3.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 # host="rm-j6cz4c6pt96000xi3lo.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址 port=3306, # 端口号 user="crawler", # mysql用户名 passwd="crawler123456@", # mysql用户登录密码 db="piaoquan-crawler", # 数据库名 # 如果数据库里面的文本是utf8编码的,charset指定是utf8 charset="utf8", ) elif env == "prod": # 创建一个 Connection 对象,代表了一个数据库连接 connection = pymysql.connect( host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址 port=3306, # 端口号 user="crawler", # mysql用户名 passwd="crawler123456@", # mysql用户登录密码 db="piaoquan-crawler", # 数据库名 # 如果数据库里面的文本是utf8编码的,charset指定是utf8 charset="utf8", ) else: # 创建一个 Connection 对象,代表了一个数据库连接 connection = pymysql.connect( host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 # host="rm-bp1k5853td1r25g3ndo.mysql.rds.aliyuncs.com", # 数据库IP地址,外网地址 port=3306, # 端口号 user="crawler", # mysql用户名 passwd="crawler123456@", # mysql用户登录密码 db="piaoquan-crawler", # 数据库名 # 如果数据库里面的文本是utf8编码的,charset指定是utf8 charset="utf8", ) return connection @classmethod def get_values(cls, log_type, crawler, sql, env, machine): try: # 连接数据库 connect = cls.connect_mysql(env, machine) # 返回一个 Cursor对象 mysql = connect.cursor() # 执行 sql 语句 mysql.execute(sql) # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录 data = mysql.fetchall() # 关闭数据库连接 connect.close() # 返回查询结果,元组 return data except Exception as e: Common.logger(log_type, crawler).error(f"get_values异常:{e}\n") @classmethod def update_values(cls, log_type, crawler, sql, env, machine): # 连接数据库 connect = cls.connect_mysql(env, machine) # 返回一个 Cursor对象 mysql = connect.cursor() try: # 执行 sql 语句 res = mysql.execute(sql) # 注意 一定要commit,否则添加数据不生效 connect.commit() return res except Exception as e: Common.logger(log_type, crawler).error(f"update_values异常,进行回滚操作:{e}\n") # 发生错误时回滚 connect.rollback() # 关闭数据库连接 connect.close() class RedisHelper: @classmethod def connect_redis(cls, env, machine): if machine == "aliyun_hk": redis_pool = redis.ConnectionPool( # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com', # 内网地址 host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com", # 外网地址 port=6379, db=2, password="Qingqu2019", ) redis_conn = redis.Redis(connection_pool=redis_pool) elif env == "prod": redis_pool = redis.ConnectionPool( host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", # 内网地址 # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com', # 外网地址 port=6379, db=2, password="Qingqu2019", ) redis_conn = redis.Redis(connection_pool=redis_pool) else: redis_pool = redis.ConnectionPool( # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com', # 内网地址 host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com", # 外网地址 port=6379, db=2, password="Qingqu2019", ) redis_conn = redis.Redis(connection_pool=redis_pool) return redis_conn @classmethod def redis_push(cls, env, machine, data): redis_conn = cls.connect_redis(env, machine) # print("开始写入数据") redis_conn.lpush(machine, data) # print("数据写入完成") @classmethod def redis_pop(cls, env, machine): redis_conn = cls.connect_redis(env, machine) if redis_conn.llen(machine) == 0: return None else: return redis_conn.rpop(machine) class RedisClient(object): """ Redis client by python Todo 如果 Redis 服务挂了,怎么做能够不影响业务 思路, 每次使用 redis 接口前先判断是否连接成功,如果连接失败则跳过 redis ,不影响全局 """ def __init__(self): self.pool = None self.host = "r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com" self.port = 6379 self.db = 2 self.password = "Wqsd@2019" def connect(self): """ connect to redis server :return: bool """ try: self.pool = redis.Redis( host=self.host, port=self.port, db=self.db, password=self.password ) return True except Exception as e: print("connect to redis fail, the reason is {}".format(e)) return False def select(self, key): """ read info from redis :return: """ return self.pool.get(key) def insert(self, key, value, expire_time): """ insert info from redis :return: """ self.pool.set(key, value, expire_time) def delete(self, key): """ delete key :param key: :return: """ self.pool.delete(key)