123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- """
- @author: luojunhui
- """
- import time
- import json
- import threading
- import requests
- import schedule
- from tqdm import tqdm
- from datetime import datetime
- from config import accountBaseInfo
- from applications import PQMySQL
- from tasks.task4 import update_articles
- from applications.decoratorApi import retryOnTimeout
- class UpdateMsgDaily(object):
- """
- 日常更新文章
- """
- db_client = PQMySQL()
- @classmethod
- def getAccountIdDict(cls):
- """
- 获取全部内部账号的id
- :return:
- """
- gh_id_dict = {}
- for key in accountBaseInfo:
- gh_id = accountBaseInfo[key]["ghId"]
- name = accountBaseInfo[key]["accountName"]
- gh_id_dict[gh_id] = name
- return gh_id_dict
- @classmethod
- @retryOnTimeout()
- def bot(cls, account_list):
- """
- 机器人
- """
- url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
- headers = {"Content-Type": "application/json"}
- payload = {
- "msg_type": "interactive",
- "card": {
- "elements": [
- {
- "tag": "div",
- "text": {
- "content": "存在文章更新失败<at id=all></at>\n",
- "tag": "lark_md",
- },
- },
- {
- "tag": "div",
- "text": {
- "content": json.dumps(
- account_list, ensure_ascii=False, indent=4
- ),
- "tag": "lark_md",
- },
- },
- ],
- "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
- },
- }
- requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
- @classmethod
- def checkEachAccount(cls, gh_id):
- """
- 验证单个账号是否当天有更新
- :param gh_id:
- :return:
- """
- today_str = datetime.today().strftime("%Y-%m-%d")
- today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
- today_timestamp = today_date_time.timestamp()
- sql = f"""
- select updateTime
- from official_articles_v2
- where ghId = '{gh_id}'
- order by updateTime
- desc
- """
- latest_update_time = cls.db_client.select(sql)[0][0]
- # 判断该账号当天发布的文章是否被收集
- if int(latest_update_time) > int(today_timestamp):
- return True
- else:
- return False
- @classmethod
- def updateJob(cls):
- """
- 更新文章任务
- :return:
- """
- account_dict = cls.getAccountIdDict()
- account_list = list(account_dict.keys())
- for account_id in tqdm(account_list):
- try:
- update_articles(gh_id=account_id)
- except Exception as e:
- response = {
- "code": 1001,
- "info": "单个账号更新失败",
- "error": str(e),
- "time_stamp": datetime.now().__str__(),
- }
- print(response)
- @classmethod
- def checkJob(cls):
- """
- 验证所有账号是否已经有更新数据
- :return:
- todo: 被封禁账号&&服务号需要做区分
- """
- account_dict = cls.getAccountIdDict()
- error_account_list = []
- for account_id in tqdm(account_dict):
- if cls.checkEachAccount(account_id):
- continue
- else:
- name = account_dict[account_id]
- error_account_list.append(name)
- if error_account_list:
- try:
- cls.bot(error_account_list)
- except Exception as e:
- print("Timeout Error: {}".format(e))
- def job_with_thread(job_func):
- """
- 每个任务放到单个线程中
- :param job_func:
- :return:
- """
- job_thread = threading.Thread(target=job_func)
- job_thread.start()
- if __name__ == "__main__":
- UMD = UpdateMsgDaily()
- try:
- schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob)
- except Exception as error:
- UMD.bot(account_list=["更新文章定时任务异常终止", str(error)])
- try:
- schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob)
- except Exception as error:
- UMD.bot(account_list=['校验账号任务异常终止', str(error)])
- while True:
- schedule.run_pending()
- time.sleep(1)
|