manage_accounts.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import time
  2. import schedule
  3. import multiprocessing
  4. from common.scheduling_db import MysqlHelper
  5. def read_accounts_from_mysql():
  6. """
  7. Read accounts from mysql database
  8. """
  9. sql = f"""select tag, uid from crawler_user_v3 order by create_time desc;"""
  10. result = MysqlHelper.get_values(
  11. log_type="author", crawler="changsha", env="prod", sql=sql
  12. )
  13. limit_tag_dict = {
  14. "352": "余海涛",
  15. "353": "罗情",
  16. "53": "范军",
  17. "51": "鲁涛",
  18. "131": "王雪珂",
  19. "6682": "公众新号",
  20. "469": "小年糕",
  21. "464": "快手",
  22. "5662": "快手账号爬虫",
  23. "459": "spider",
  24. "85": "快手爬虫",
  25. "454": "账号",
  26. "467": "视频号",
  27. "106": "⭐️小年糕爬虫",
  28. "120": "西瓜新爬虫",
  29. "499": "抖音",
  30. "2235": "抖音爬虫"
  31. }
  32. p_dict = {}
  33. for item in result:
  34. tag_list = item['tag'].split(",")
  35. tag_set = set(tag_list)
  36. require_set = {'454', '459'}
  37. forbidden_set = {'131', '465', '1379', '160'}
  38. if len(tag_set) >= 5:
  39. if require_set.issubset(tag_set) and forbidden_set.isdisjoint(tag_set):
  40. w = [limit_tag_dict.get(tag, None) for tag in tag_list]
  41. p_dict[item['uid']] = w
  42. return p_dict
  43. def insert_accounts(account_dict):
  44. """
  45. 把长沙同学账号插入到 changsha_accounts 中
  46. """
  47. for key in account_dict:
  48. select_sql = f"""select id from changsha_user_accounts where piaoquan_account_id = {key};"""
  49. result = MysqlHelper.get_values(
  50. log_type="author", crawler="changsha", env="prod", sql=select_sql
  51. )
  52. if result:
  53. continue
  54. tags = set(account_dict[key])
  55. name_set = {'鲁涛', '罗情', '余海涛', '范军'}
  56. platform_set = {'西瓜新爬虫', '快手账号爬虫', '公众新号', '⭐️小年糕爬虫', '抖音爬虫', '视频号'}
  57. name = tags & name_set
  58. platform = tags & platform_set
  59. if name and platform:
  60. user_name = list(name)[0]
  61. platform_name = list(platform)[0]
  62. sql = f"""INSERT INTO changsha_user_accounts (piaoquan_account_id, user_name, platform) VALUES ('{key}', '{user_name}', '{platform_name}');"""
  63. MysqlHelper.update_values(log_type="author", crawler="changsha", sql=sql, env="prod")
  64. def protect_(function):
  65. """
  66. 守护进程,在程序启动后的某一个时段内守护爬虫进程
  67. :param function: 被守护的函数
  68. """
  69. process = multiprocessing.Process(target=function)
  70. process.start()
  71. while True:
  72. if not process.is_alive():
  73. process.terminate()
  74. time.sleep(60)
  75. process = multiprocessing.Process(target=function)
  76. process.start()
  77. time.sleep(60)
  78. def process_acc():
  79. """
  80. 执行函数
  81. """
  82. dd_dict = read_accounts_from_mysql()
  83. insert_accounts(dd_dict)
  84. def main():
  85. """
  86. 定时执行任务, 每天晚上更新账号
  87. """
  88. schedule.every().day.at("23:40").do(process_acc)
  89. if __name__ == '__main__':
  90. protect_(main())