import json import traceback from application.common import MysqlHelper, AliyunLogger,Local class MysqlService: def __init__(self,platform, mode, task_id): self.env = "prod" self.task_id = task_id self.mode = mode self.platform = platform self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env) self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode) self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True) Local.init_logger(self.platform,self.mode) def get_rule_dict(self): """ :return: 返回任务的规则, task_rule """ rule_dict = {} task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};" data = self.MySQL.select(task_rule_sql) if data: rule_list = json.loads(data[0][0]) for item in rule_list: for key in item: rule_dict[key] = item[key] self.aliyun_log.logging( code=1000, message="抓取规则", data=rule_dict ) return rule_dict def get_user_list(self): """ :return: 返回用户列表 """ task_user_list_sql = f"SELECT uid, link, nick_name from crawler_user_v3 where task_id = {self.task_id};" uid_list = self.MySQL.select(task_user_list_sql) user_list = [{"uid": i[0], "link": i[1], "nick_name": i[2]} for i in uid_list] if uid_list else [] self.aliyun_log.logging( code=1000, message="用户列表", data=user_list ) return user_list def check_user_id(self, uid): """ 检查指定用户ID是否存在于数据库的zqkd_uid表中。 :param uid:要检查的用户ID :return:如果用户ID存在于表中返回True,否则返回False """ try: query_sql = f""" SELECT uid FROM zqkd_user WHERE uid = "{uid}"; """ result = self.mysql.select(sql=query_sql) return bool(result) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}") return False def update_user(self, uid, user_name, avatar_url): """ 更新数据库中指定用户的用户名和头像URL。 :param uid:要更新信息的用户ID :param user_name:新的用户名 :param avatar_url:新的头像URL :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常 """ try: update_sql = f""" UPDATE zqkd_user SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """ return self.MySQL.update(sql=update_sql) except Exception as e: tb = traceback.format_exc() self.logger.error(f"更新用户信息失败: {e}\n{tb}") return None def insert_user(self, uid, user_name, avatar_url): """ 向数据库的zqkd_user表中插入或更新用户信息 :param uid: 用户ID(数值类型) :param user_name: 用户名 :param avatar_url: 头像URL :return: 成功返回影响的行数,失败返回None """ try: # 直接拼接SQL(不推荐,有SQL注入风险) insert_sql = f""" INSERT INTO zqkd_user (uid, avatar_url, user_name) VALUES ({uid}, '{avatar_url.replace("'", "''")}', '{user_name.replace("'", "''")}') ON DUPLICATE KEY UPDATE user_name = '{user_name.replace("'", "''")}', avatar_url = '{avatar_url.replace("'", "''")}' """ return self.MySQL.update(sql=insert_sql) except Exception as e: tb = traceback.format_exc() self.logger.error(f"插入用户信息失败: {e}\n{tb}") return None def get_today_videos(self): try: # 手动转义单引号(仅缓解部分风险) sql = """ SELECT count(*) as cnt FROM crawler_video WHERE create_time >= CURDATE() AND create_time < CURDATE() + INTERVAL 1 DAY AND platform = %s AND strategy = %s """ result = self.MySQL.select_params(sql, (self.platform, self.mode)) if result and len(result) > 0: return result[0][0] # 返回第一行第一列的计数值 return 0 # 无结果时返回0 except Exception as e: self.logger.error(f"查询失败: {e}") return 0 def select_user(self, last_scanned_id=0): """ 根据last_scanned_id查询用户数据 :param last_scanned_id: 上次扫描的ID,0表示从头开始 :return: 查询结果列表 """ try: # 构建查询(根据last_scanned_id过滤) query = "SELECT id, uid FROM zqkd_user" if last_scanned_id > 0: query += f" WHERE id > {last_scanned_id}" query += " ORDER BY id ASC" return self.MySQL.select(query) except Exception as e: tb = traceback.format_exc() self.logger.error(f"查询用户列表失败: {e}\n{tb}") return []