import os import sys import time import datetime import schedule from concurrent.futures.thread import ThreadPoolExecutor sys.path.append(os.getcwd()) from common.db import MysqlHelper from common.feishu import Feishu from common.common import Common class AccountManager(object): def __init__(self): self.unbind_account_list = [] self.user_accounts = None def select_accounts(self): select_sql = "SELECT uid FROM crawler_user_v3 where mode = 'author' and source = 'gongzhonghao' and task_id = 27;" self.user_accounts = MysqlHelper.get_values( log_type="recommend", crawler="gongzhonghao", sql=select_sql, env="prod", machine="", ) def search_user_video(self, uid): search_sql = f"""SELECT user_id, update_time from crawler_video where user_id = {uid[0]} order by update_time DESC""" videos = MysqlHelper.get_values( log_type="recommend", crawler="gongzhonghao", sql=search_sql, env="prod", machine="" ) if videos: user_id, last_update_time = videos[0] self.change_task_status(user_id, last_update_time) def change_task_status(self, user_id, last_update_time): now = time.time() last_update_stamp = last_update_time.timestamp() if int(now) - int(last_update_stamp) > 10 * 24 * 60 * 60: self.unbind_account_list.append(user_id) update_sql = f"""update crawler_user_v3 set task_id = 0 where uid = {user_id};""" MysqlHelper.update_values( log_type="recommend", crawler="gongzhonghao", sql=update_sql, env="prod", machine="" ) else: print("账号正常") pass def bot(self): if self.unbind_account_list: text = "ID: 是 {} 的公众号,近10天无新视频入库,自动取消绑定,操作日期: {}".format(self.unbind_account_list, datetime.datetime.now().__str__()) Feishu.bot( log_type="recommend", crawler="gongzhonghao_scheduler", text=text ) else: Common.logger( log_type="recommend", crawler="gongzhonghao" ).info("一切正常") def run(): AcM = AccountManager() AcM.select_accounts() with ThreadPoolExecutor(max_workers=8) as pool: pool.map(AcM.search_user_video, AcM.user_accounts) AcM.bot() if __name__ == '__main__': schedule.every().day.at("00:05").do(run) while True: schedule.run_pending()