""" @author: luojunhui """ import time import json import threading import requests import schedule from tqdm import tqdm from datetime import datetime from config import accountBaseInfo from applications import PQMySQL from tasks.task4 import update_articles from applications.decoratorApi import retryOnTimeout class UpdateMsgDaily(object): """ 日常更新文章 """ db_client = PQMySQL() @classmethod def getAccountIdDict(cls): """ 获取全部内部账号的id :return: """ gh_id_dict = {} for key in accountBaseInfo: gh_id = accountBaseInfo[key]["ghId"] name = accountBaseInfo[key]["accountName"] gh_id_dict[gh_id] = name return gh_id_dict @classmethod @retryOnTimeout() def bot(cls, account_list): """ 机器人 """ url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410" headers = {"Content-Type": "application/json"} payload = { "msg_type": "interactive", "card": { "elements": [ { "tag": "div", "text": { "content": "存在文章更新失败\n", "tag": "lark_md", }, }, { "tag": "div", "text": { "content": json.dumps( account_list, ensure_ascii=False, indent=4 ), "tag": "lark_md", }, }, ], "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}}, }, } requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10) @classmethod def checkEachAccount(cls, gh_id): """ 验证单个账号是否当天有更新 :param gh_id: :return: """ today_str = datetime.today().strftime("%Y-%m-%d") today_date_time = datetime.strptime(today_str, "%Y-%m-%d") today_timestamp = today_date_time.timestamp() sql = f""" select updateTime from official_articles_v2 where ghId = '{gh_id}' order by updateTime desc """ latest_update_time = cls.db_client.select(sql)[0][0] # 判断该账号当天发布的文章是否被收集 if int(latest_update_time) > int(today_timestamp): return True else: return False @classmethod def updateJob(cls): """ 更新文章任务 :return: """ account_dict = cls.getAccountIdDict() account_list = list(account_dict.keys()) for account_id in tqdm(account_list): try: update_articles(gh_id=account_id) except Exception as e: response = { "code": 1001, "info": "单个账号更新失败", "error": str(e), "time_stamp": datetime.now().__str__(), } print(response) @classmethod def checkJob(cls): """ 验证所有账号是否已经有更新数据 :return: todo: 被封禁账号&&服务号需要做区分 """ account_dict = cls.getAccountIdDict() error_account_list = [] for account_id in tqdm(account_dict): if cls.checkEachAccount(account_id): continue else: name = account_dict[account_id] error_account_list.append(name) if error_account_list: try: cls.bot(error_account_list) except Exception as e: print("Timeout Error: {}".format(e)) def job_with_thread(job_func): """ 每个任务放到单个线程中 :param job_func: :return: """ job_thread = threading.Thread(target=job_func) job_thread.start() if __name__ == "__main__": UMD = UpdateMsgDaily() try: schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob) except Exception as error: UMD.bot(account_list=["更新文章定时任务异常终止", str(error)]) try: schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob) except Exception as error: UMD.bot(account_list=['校验账号任务异常终止', str(error)]) while True: schedule.run_pending() time.sleep(1)