import time import schedule import multiprocessing from common.scheduling_db import MysqlHelper from common.aliyun_log import AliyunLogger def read_accounts_from_mysql(): """ Read accounts from mysql database """ sql = f"""select tag, uid from crawler_user_v3 order by create_time desc;""" result = MysqlHelper.get_values( log_type="author", crawler="changsha", env="prod", sql=sql ) limit_tag_dict = { "352": "余海涛", "353": "罗情", "53": "范军", "51": "鲁涛", "131": "王雪珂", "6682": "公众新号", "469": "小年糕", "464": "快手", "5662": "快手账号爬虫", "459": "spider", "85": "快手爬虫", "454": "账号", "467": "视频号", "106": "⭐️小年糕爬虫", "120": "西瓜新爬虫", "499": "抖音", "2235": "抖音爬虫" } p_dict = {} for item in result: tag_list = item['tag'].split(",") tag_set = set(tag_list) require_set = {'454', '459'} forbidden_set = {'131', '465', '1379', '160'} if len(tag_set) >= 5: if require_set.issubset(tag_set) and forbidden_set.isdisjoint(tag_set): w = [limit_tag_dict.get(tag, None) for tag in tag_list] p_dict[item['uid']] = w return p_dict def insert_accounts(account_dict): """ 把长沙同学账号插入到 changsha_accounts 中 """ for key in account_dict: select_sql = f"""select id from changsha_user_accounts where piaoquan_account_id = {key};""" result = MysqlHelper.get_values( log_type="author", crawler="changsha", env="prod", sql=select_sql ) if result: continue tags = set(account_dict[key]) name_set = {'鲁涛', '罗情', '余海涛', '范军'} platform_set = {'西瓜新爬虫', '快手账号爬虫', '公众新号', '⭐️小年糕爬虫', '抖音爬虫', '视频号'} name = tags & name_set platform = tags & platform_set if name and platform: user_name = list(name)[0] platform_name = list(platform)[0] sql = f"""INSERT INTO changsha_user_accounts (piaoquan_account_id, user_name, platform) VALUES ('{key}', '{user_name}', '{platform_name}');""" MysqlHelper.update_values(log_type="author", crawler="changsha", sql=sql, env="prod") AliyunLogger.logging( code="8888", platform=platform, mode="author", env="prod", message="更新账号-{}-{}".format(user_name, key) ) def protect_(function): """ 守护进程,在程序启动后的某一个时段内守护爬虫进程 :param function: 被守护的函数 """ process = multiprocessing.Process(target=function) process.start() while True: if not process.is_alive(): process.terminate() time.sleep(60) process = multiprocessing.Process(target=function) process.start() time.sleep(60) def process_acc(): """ 执行函数 """ dd_dict = read_accounts_from_mysql() insert_accounts(dd_dict) def main(): """ 定时执行任务, 每天晚上更新账号 """ schedule.every().day.at("23:45").do(process_acc) while True: schedule.run_pending() # 运行待处理的任务 time.sleep(1) # 每隔一秒检查一次是否有待执行的任务 if __name__ == '__main__': # protect_(main) # process_acc() main()