123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import json
- import traceback
- from application.common import MysqlHelper, AliyunLogger,Local
- class MysqlService:
- def __init__(self, task_id, mode, platform):
- self.env = "prod"
- self.task_id = task_id
- self.mode = mode
- self.platform = platform
- self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env)
- self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
- self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
- Local.init_logger(self.platform,self.mode)
- def get_rule_dict(self):
- """
- :return: 返回任务的规则, task_rule
- """
- rule_dict = {}
- task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};"
- data = self.MySQL.select(task_rule_sql)
- if data:
- rule_list = json.loads(data[0][0])
- for item in rule_list:
- for key in item:
- rule_dict[key] = item[key]
- self.aliyun_log.logging(
- code=1000,
- message="抓取规则",
- data=rule_dict
- )
- return rule_dict
- def get_user_list(self):
- """
- :return: 返回用户列表
- """
- task_user_list_sql = f"SELECT uid, link, nick_name from crawler_user_v3 where task_id = {self.task_id};"
- uid_list = self.MySQL.select(task_user_list_sql)
- user_list = [{"uid": i[0], "link": i[1], "nick_name": i[2]} for i in uid_list] if uid_list else []
- self.aliyun_log.logging(
- code=1000,
- message="用户列表",
- data=user_list
- )
- return user_list
- def check_user_id(self, uid):
- """
- 检查指定用户ID是否存在于数据库的zqkd_uid表中。
- :param uid:要检查的用户ID
- :return:如果用户ID存在于表中返回True,否则返回False
- """
- try:
- query_sql = f""" SELECT uid FROM zqkd_user WHERE uid = "{uid}"; """
- result = self.mysql.select(sql=query_sql)
- return bool(result)
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")
- return False
- def update_user(self, uid, user_name, avatar_url):
- """
- 更新数据库中指定用户的用户名和头像URL。
- :param uid:要更新信息的用户ID
- :param user_name:新的用户名
- :param avatar_url:新的头像URL
- :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
- """
- try:
- update_sql = f""" UPDATE zqkd_user SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
- return self.MySQL.update(sql=update_sql)
- except Exception as e:
- tb = traceback.format_exc()
- self.logger.error(f"更新用户信息失败: {e}\n{tb}")
- return None
- def insert_user(self, uid, user_name, avatar_url):
- """
- 向数据库的zqkd_user表中插入或更新用户信息
- :param uid: 用户ID(数值类型)
- :param user_name: 用户名
- :param avatar_url: 头像URL
- :return: 成功返回影响的行数,失败返回None
- """
- try:
- # 直接拼接SQL(不推荐,有SQL注入风险)
- insert_sql = f"""
- INSERT INTO zqkd_user (uid, avatar_url, user_name)
- VALUES ({uid}, '{avatar_url.replace("'", "''")}', '{user_name.replace("'", "''")}')
- ON DUPLICATE KEY UPDATE
- user_name = '{user_name.replace("'", "''")}',
- avatar_url = '{avatar_url.replace("'", "''")}'
- """
- return self.MySQL.update(sql=insert_sql)
- except Exception as e:
- tb = traceback.format_exc()
- self.logger.error(f"插入用户信息失败: {e}\n{tb}")
- return None
- def get_today_videos(self):
- try:
- # 手动转义单引号(仅缓解部分风险)
- sql = """
- SELECT count(*) as cnt
- FROM crawler_video
- WHERE create_time >= CURDATE()
- AND create_time < CURDATE() + INTERVAL 1 DAY
- AND platform = %s
- AND strategy = %s
- """
- result = self.MySQL.select_params(sql, (self.platform, self.mode))
- if result and len(result) > 0:
- return result[0][0] # 返回第一行第一列的计数值
- return 0 # 无结果时返回0
- except Exception as e:
- self.logger.error(f"查询失败: {e}")
- return 0
- def select_user(self, last_scanned_id=0):
- """
- 根据last_scanned_id查询用户数据
- :param last_scanned_id: 上次扫描的ID,0表示从头开始
- :return: 查询结果列表
- """
- try:
- # 构建查询(根据last_scanned_id过滤)
- query = "SELECT id, uid FROM zqkd_user"
- if last_scanned_id > 0:
- query += f" WHERE id > {last_scanned_id}"
- query += " ORDER BY id ASC"
- return self.MySQL.select(query)
- except Exception as e:
- tb = traceback.format_exc()
- self.logger.error(f"查询用户列表失败: {e}\n{tb}")
- return []
|