manage_accounts.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import time
  2. import schedule
  3. import multiprocessing
  4. from common.scheduling_db import MysqlHelper
  5. from common.aliyun_log import AliyunLogger
  6. def read_accounts_from_mysql():
  7. """
  8. Read accounts from mysql database
  9. """
  10. sql = f"""select tag, uid from crawler_user_v3 order by create_time desc;"""
  11. result = MysqlHelper.get_values(
  12. log_type="author", crawler="changsha", env="prod", sql=sql
  13. )
  14. limit_tag_dict = {
  15. "352": "余海涛",
  16. "353": "罗情",
  17. "53": "范军",
  18. "51": "鲁涛",
  19. "131": "王雪珂",
  20. "6682": "公众新号",
  21. "469": "小年糕",
  22. "464": "快手",
  23. "5662": "快手账号爬虫",
  24. "459": "spider",
  25. "85": "快手爬虫",
  26. "454": "账号",
  27. "467": "视频号",
  28. "106": "⭐️小年糕爬虫",
  29. "120": "西瓜新爬虫",
  30. "499": "抖音",
  31. "2235": "抖音爬虫"
  32. }
  33. p_dict = {}
  34. for item in result:
  35. tag_list = item['tag'].split(",")
  36. tag_set = set(tag_list)
  37. require_set = {'454', '459'}
  38. forbidden_set = {'131', '465', '1379', '160'}
  39. if len(tag_set) >= 5:
  40. if require_set.issubset(tag_set) and forbidden_set.isdisjoint(tag_set):
  41. w = [limit_tag_dict.get(tag, None) for tag in tag_list]
  42. p_dict[item['uid']] = w
  43. return p_dict
  44. def insert_accounts(account_dict):
  45. """
  46. 把长沙同学账号插入到 changsha_accounts 中
  47. """
  48. for key in account_dict:
  49. select_sql = f"""select id from changsha_user_accounts where piaoquan_account_id = {key};"""
  50. result = MysqlHelper.get_values(
  51. log_type="author", crawler="changsha", env="prod", sql=select_sql
  52. )
  53. if result:
  54. continue
  55. tags = set(account_dict[key])
  56. name_set = {'鲁涛', '罗情', '余海涛', '范军'}
  57. platform_set = {'西瓜新爬虫', '快手账号爬虫', '公众新号', '⭐️小年糕爬虫', '抖音爬虫', '视频号'}
  58. name = tags & name_set
  59. platform = tags & platform_set
  60. if name and platform:
  61. user_name = list(name)[0]
  62. platform_name = list(platform)[0]
  63. sql = f"""INSERT INTO changsha_user_accounts (piaoquan_account_id, user_name, platform) VALUES ('{key}', '{user_name}', '{platform_name}');"""
  64. MysqlHelper.update_values(log_type="author", crawler="changsha", sql=sql, env="prod")
  65. AliyunLogger.logging(
  66. code="8888",
  67. platform=platform,
  68. mode="author",
  69. env="prod",
  70. message="更新账号-{}-{}".format(user_name, key)
  71. )
  72. def protect_(function):
  73. """
  74. 守护进程,在程序启动后的某一个时段内守护爬虫进程
  75. :param function: 被守护的函数
  76. """
  77. process = multiprocessing.Process(target=function)
  78. process.start()
  79. while True:
  80. if not process.is_alive():
  81. process.terminate()
  82. time.sleep(60)
  83. process = multiprocessing.Process(target=function)
  84. process.start()
  85. time.sleep(60)
  86. def process_acc():
  87. """
  88. 执行函数
  89. """
  90. dd_dict = read_accounts_from_mysql()
  91. insert_accounts(dd_dict)
  92. def main():
  93. """
  94. 定时执行任务, 每天晚上更新账号
  95. """
  96. schedule.every().day.at("23:45").do(process_acc)
  97. while True:
  98. schedule.run_pending() # 运行待处理的任务
  99. time.sleep(1) # 每隔一秒检查一次是否有待执行的任务
  100. if __name__ == '__main__':
  101. # protect_(main)
  102. # process_acc()
  103. main()