123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/2/2
- """
- 数据库连接及操作
- """
- import redis
- import pymysql
- from common.common import Common
- # from common import Common
- class MysqlHelper:
- @classmethod
- def connect_mysql(cls, env, machine):
- if machine == "aliyun_hk":
- # 创建一个 Connection 对象,代表了一个数据库连接
- connection = pymysql.connect(
- host="rm-j6cz4c6pt96000xi3.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- # host="rm-j6cz4c6pt96000xi3lo.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
- port=3306, # 端口号
- user="crawler", # mysql用户名
- passwd="crawler123456@", # mysql用户登录密码
- db="piaoquan-crawler", # 数据库名
- # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- charset="utf8",
- )
- elif env == "prod":
- # 创建一个 Connection 对象,代表了一个数据库连接
- connection = pymysql.connect(
- host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
- port=3306, # 端口号
- user="crawler", # mysql用户名
- passwd="crawler123456@", # mysql用户登录密码
- db="piaoquan-crawler", # 数据库名
- # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- charset="utf8",
- )
- else:
- # 创建一个 Connection 对象,代表了一个数据库连接
- connection = pymysql.connect(
- host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- # host="rm-bp1k5853td1r25g3ndo.mysql.rds.aliyuncs.com", # 数据库IP地址,外网地址
- port=3306, # 端口号
- user="crawler", # mysql用户名
- passwd="crawler123456@", # mysql用户登录密码
- db="piaoquan-crawler", # 数据库名
- # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- charset="utf8",
- )
- return connection
- @classmethod
- def get_values(cls, log_type, crawler, sql, env, machine):
- try:
- # 连接数据库
- connect = cls.connect_mysql(env, machine)
- # 返回一个 Cursor对象
- mysql = connect.cursor()
- # 执行 sql 语句
- mysql.execute(sql)
- # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
- data = mysql.fetchall()
- # 关闭数据库连接
- connect.close()
- # 返回查询结果,元组
- return data
- except Exception as e:
- Common.logger(log_type, crawler).error(f"get_values异常:{e}\n")
- @classmethod
- def update_values(cls, log_type, crawler, sql, env, machine):
- # 连接数据库
- connect = cls.connect_mysql(env, machine)
- # 返回一个 Cursor对象
- mysql = connect.cursor()
- try:
- # 执行 sql 语句
- res = mysql.execute(sql)
- # 注意 一定要commit,否则添加数据不生效
- connect.commit()
- return res
- except Exception as e:
- Common.logger(log_type, crawler).error(f"update_values异常,进行回滚操作:{e}\n")
- # 发生错误时回滚
- connect.rollback()
- # 关闭数据库连接
- connect.close()
- class RedisHelper:
- @classmethod
- def connect_redis(cls, env, machine):
- if machine == "aliyun_hk":
- redis_pool = redis.ConnectionPool(
- # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com', # 内网地址
- host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com", # 外网地址
- port=6379,
- db=2,
- password="Qingqu2019",
- )
- redis_conn = redis.Redis(connection_pool=redis_pool)
- elif env == "prod":
- redis_pool = redis.ConnectionPool(
- host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", # 内网地址
- # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com', # 外网地址
- port=6379,
- db=2,
- password="Qingqu2019",
- )
- redis_conn = redis.Redis(connection_pool=redis_pool)
- else:
- redis_pool = redis.ConnectionPool(
- # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com', # 内网地址
- host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com", # 外网地址
- port=6379,
- db=2,
- password="Qingqu2019",
- )
- redis_conn = redis.Redis(connection_pool=redis_pool)
- return redis_conn
- @classmethod
- def redis_push(cls, env, machine, data):
- redis_conn = cls.connect_redis(env, machine)
- # print("开始写入数据")
- redis_conn.lpush(machine, data)
- # print("数据写入完成")
- @classmethod
- def redis_pop(cls, env, machine):
- redis_conn = cls.connect_redis(env, machine)
- if redis_conn.llen(machine) == 0:
- return None
- else:
- return redis_conn.rpop(machine)
- class RedisClient(object):
- """
- Redis client by python
- Todo 如果 Redis 服务挂了,怎么做能够不影响业务
- 思路, 每次使用 redis 接口前先判断是否连接成功,如果连接失败则跳过 redis ,不影响全局
- """
- def __init__(self):
- self.pool = None
- self.host = "r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com"
- self.port = 6379
- self.db = 2
- self.password = "Wqsd@2019"
- def connect(self):
- """
- connect to redis server
- :return: bool
- """
- try:
- self.pool = redis.Redis(
- host=self.host, port=self.port, db=self.db, password=self.password
- )
- return True
- except Exception as e:
- print("connect to redis fail, the reason is {}".format(e))
- return False
- def select(self, key):
- """
- read info from redis
- :return:
- """
- return self.pool.get(key)
- def insert(self, key, value, expire_time):
- """
- insert info from redis
- :return:
- """
- self.pool.set(key, value, expire_time)
- def delete(self, key):
- """
- delete key
- :param key:
- :return:
- """
- self.pool.delete(key)
|