""" @author: luojunhui @description: 监测进程是否 alive in every 10 minutes """ import datetime import subprocess import time from applications import bot def threadMonitor(): """ 校验进程是否 alive :return: """ result = subprocess.run( ["ps", "aux"], stdout=subprocess.PIPE, text=True ) # 获取命令输出结果 output = result.stdout # filter get_off_job = [line for line in output.splitlines() if 'python3 getOffVideosDaily.py' in line] migrate_source_id_job = [line for line in output.splitlines() if 'python3 migrateRootSourceId.py' in line] updateAccountAvgDaily = [line for line in output.splitlines() if 'python3 updateAccountAvgDaily.py' in line] updateMinigramInfoDaily = [line for line in output.splitlines() if 'python3 updateMinigramInfoDaily.py' in line] updatePublishedMsgDaily = [line for line in output.splitlines() if 'python3 updatePublishedMsgDaily.py' in line] checkVideoStatusDaily = [line for line in output.splitlines() if 'python3 checkVideoStatusDaily.py' in line] if not get_off_job: bot( title="定时任务进程异常挂掉", detail={ "Job": "GetOffVideosJob", "Time": datetime.datetime.now().__str__() } ) if not migrate_source_id_job: bot( title="定时任务进程异常挂掉", detail={ "Job": "migrate_source_id_job", "Time": datetime.datetime.now().__str__() } ) if not updateAccountAvgDaily: bot( title="定时任务进程异常挂掉", detail={ "Job": "updateAccountAvgDaily", "Time": datetime.datetime.now().__str__() } ) if not updateMinigramInfoDaily: bot( title="定时任务进程异常挂掉", detail={ "Job": "updateMinigramInfoDaily", "Time": datetime.datetime.now().__str__() } ) if not updatePublishedMsgDaily: bot( title="定时任务进程异常挂掉", detail={ "Job": "updatePublishedMsgDaily", "Time": datetime.datetime.now().__str__() } ) if not checkVideoStatusDaily: bot( title="定时任务进程异常挂掉", detail={ "Job": "checkVideoStatusDaily", "Time": datetime.datetime.now().__str__() } ) if __name__ == '__main__': while True: threadMonitor() time.sleep(60 * 10)