mysql_service.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import json
  2. import traceback
  3. from application.common import MysqlHelper, AliyunLogger,Local
  4. class MysqlService:
  5. def __init__(self,platform, mode, task_id):
  6. self.env = "prod"
  7. self.task_id = task_id
  8. self.mode = mode
  9. self.platform = platform
  10. self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env)
  11. self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
  12. self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
  13. Local.init_logger(self.platform,self.mode)
  14. def get_rule_dict(self):
  15. """
  16. :return: 返回任务的规则, task_rule
  17. """
  18. rule_dict = {}
  19. task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};"
  20. data = self.MySQL.select(task_rule_sql)
  21. if data:
  22. rule_list = json.loads(data[0][0])
  23. for item in rule_list:
  24. for key in item:
  25. rule_dict[key] = item[key]
  26. self.aliyun_log.logging(
  27. code=1000,
  28. message="抓取规则",
  29. data=rule_dict
  30. )
  31. return rule_dict
  32. def get_user_list(self):
  33. """
  34. :return: 返回用户列表
  35. """
  36. task_user_list_sql = f"SELECT uid, link, nick_name from crawler_user_v3 where task_id = {self.task_id};"
  37. uid_list = self.MySQL.select(task_user_list_sql)
  38. user_list = [{"uid": i[0], "link": i[1], "nick_name": i[2]} for i in uid_list] if uid_list else []
  39. self.aliyun_log.logging(
  40. code=1000,
  41. message="用户列表",
  42. data=user_list
  43. )
  44. return user_list
  45. def check_user_id(self, uid):
  46. """
  47. 检查指定用户ID是否存在于数据库的zqkd_uid表中。
  48. :param uid:要检查的用户ID
  49. :return:如果用户ID存在于表中返回True,否则返回False
  50. """
  51. try:
  52. query_sql = f""" SELECT uid FROM zqkd_user WHERE uid = "{uid}"; """
  53. result = self.mysql.select(sql=query_sql)
  54. return bool(result)
  55. except Exception as e:
  56. tb = traceback.format_exc()
  57. self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")
  58. return False
  59. def update_user(self, uid, user_name, avatar_url):
  60. """
  61. 更新数据库中指定用户的用户名和头像URL。
  62. :param uid:要更新信息的用户ID
  63. :param user_name:新的用户名
  64. :param avatar_url:新的头像URL
  65. :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
  66. """
  67. try:
  68. update_sql = f""" UPDATE zqkd_user SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
  69. return self.MySQL.update(sql=update_sql)
  70. except Exception as e:
  71. tb = traceback.format_exc()
  72. self.logger.error(f"更新用户信息失败: {e}\n{tb}")
  73. return None
  74. def insert_user(self, uid, user_name, avatar_url):
  75. """
  76. 向数据库的zqkd_user表中插入或更新用户信息
  77. :param uid: 用户ID(数值类型)
  78. :param user_name: 用户名
  79. :param avatar_url: 头像URL
  80. :return: 成功返回影响的行数,失败返回None
  81. """
  82. try:
  83. # 直接拼接SQL(不推荐,有SQL注入风险)
  84. insert_sql = f"""
  85. INSERT INTO zqkd_user (uid, avatar_url, user_name)
  86. VALUES ({uid}, '{avatar_url.replace("'", "''")}', '{user_name.replace("'", "''")}')
  87. ON DUPLICATE KEY UPDATE
  88. user_name = '{user_name.replace("'", "''")}',
  89. avatar_url = '{avatar_url.replace("'", "''")}'
  90. """
  91. return self.MySQL.update(sql=insert_sql)
  92. except Exception as e:
  93. tb = traceback.format_exc()
  94. self.logger.error(f"插入用户信息失败: {e}\n{tb}")
  95. return None
  96. def get_today_videos(self):
  97. try:
  98. # 手动转义单引号(仅缓解部分风险)
  99. sql = """
  100. SELECT count(*) as cnt
  101. FROM crawler_video
  102. WHERE create_time >= CURDATE()
  103. AND create_time < CURDATE() + INTERVAL 1 DAY
  104. AND platform = %s
  105. AND strategy = %s
  106. """
  107. result = self.MySQL.select_params(sql, (self.platform, self.mode))
  108. if result and len(result) > 0:
  109. return result[0][0] # 返回第一行第一列的计数值
  110. return 0 # 无结果时返回0
  111. except Exception as e:
  112. self.logger.error(f"查询失败: {e}")
  113. return 0
  114. def select_user(self, last_scanned_id=0):
  115. """
  116. 根据last_scanned_id查询用户数据
  117. :param last_scanned_id: 上次扫描的ID,0表示从头开始
  118. :return: 查询结果列表
  119. """
  120. try:
  121. # 构建查询(根据last_scanned_id过滤)
  122. query = "SELECT id, uid FROM zqkd_user"
  123. if last_scanned_id > 0:
  124. query += f" WHERE id > {last_scanned_id}"
  125. query += " ORDER BY id ASC"
  126. return self.MySQL.select(query)
  127. except Exception as e:
  128. tb = traceback.format_exc()
  129. self.logger.error(f"查询用户列表失败: {e}\n{tb}")
  130. return []