""" @author: luojunhui @description: 监测进程是否 alive in every 10 minutes """ import datetime import subprocess import time from applications import bot def threadMonitor(): """ 校验进程是否 alive :return: """ result = subprocess.run( ["ps", "aux"], stdout=subprocess.PIPE, text=True ) # 获取命令输出结果 output = result.stdout # filter # get_off_job = [line for line in output.splitlines() if 'python3 getOffVideosDaily.py' in line] # migrate_source_id_job = [line for line in output.splitlines() if 'python3 migrateRootSourceId.py' in line] # updateAccountAvgDaily = [line for line in output.splitlines() if 'python3 updateAccountAvgDaily.py' in line] updateMinigramInfoDaily = [line for line in output.splitlines() if 'python3 updateMinigramInfoDaily.py' in line] # updatePublishedMsgDaily = [line for line in output.splitlines() if 'python3 updatePublishedMsgDaily.py' in line] # checkVideoStatusDaily = [line for line in output.splitlines() if 'python3 checkVideoStatusDaily.py' in line] # if not get_off_job: # bot( # title="定时任务进程异常挂掉", # detail={ # "Job": "GetOffVideosJob", # "Time": datetime.datetime.now().__str__() # } # ) # if not updateAccountAvgDaily: # bot( # title="定时任务进程异常挂掉", # detail={ # "Job": "updateAccountAvgDaily", # "Time": datetime.datetime.now().__str__() # } # ) if not updateMinigramInfoDaily: bot( title="定时任务进程异常挂掉", detail={ "Job": "updateMinigramInfoDaily", "Time": datetime.datetime.now().__str__() } ) # if not updatePublishedMsgDaily: # bot( # title="定时任务进程异常挂掉", # detail={ # "Job": "updatePublishedMsgDaily", # "Time": datetime.datetime.now().__str__() # } # ) # if not checkVideoStatusDaily: # bot( # title="定时任务进程异常挂掉", # detail={ # "Job": "checkVideoStatusDaily", # "Time": datetime.datetime.now().__str__() # } # ) if __name__ == '__main__': while True: threadMonitor() time.sleep(60 * 10)