update_msg.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import json
  6. import threading
  7. import requests
  8. import schedule
  9. from tqdm import tqdm
  10. from datetime import datetime
  11. from config import accountBaseInfo
  12. from applications import PQMySQL
  13. from tasks.task4 import update_articles
  14. from applications.decoratorApi import retryOnTimeout
  15. class UpdateMsgDaily(object):
  16. """
  17. 日常更新文章
  18. """
  19. db_client = PQMySQL()
  20. @classmethod
  21. def getAccountIdDict(cls):
  22. """
  23. 获取全部内部账号的id
  24. :return:
  25. """
  26. gh_id_dict = {}
  27. for key in accountBaseInfo:
  28. gh_id = accountBaseInfo[key]["ghId"]
  29. name = accountBaseInfo[key]["accountName"]
  30. gh_id_dict[gh_id] = name
  31. return gh_id_dict
  32. @classmethod
  33. @retryOnTimeout()
  34. def bot(cls, account_list):
  35. """
  36. 机器人
  37. """
  38. url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
  39. headers = {"Content-Type": "application/json"}
  40. payload = {
  41. "msg_type": "interactive",
  42. "card": {
  43. "elements": [
  44. {
  45. "tag": "div",
  46. "text": {
  47. "content": "存在文章更新失败<at id=all></at>\n",
  48. "tag": "lark_md",
  49. },
  50. },
  51. {
  52. "tag": "div",
  53. "text": {
  54. "content": json.dumps(
  55. account_list, ensure_ascii=False, indent=4
  56. ),
  57. "tag": "lark_md",
  58. },
  59. },
  60. ],
  61. "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
  62. },
  63. }
  64. requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
  65. @classmethod
  66. def checkEachAccount(cls, gh_id):
  67. """
  68. 验证单个账号是否当天有更新
  69. :param gh_id:
  70. :return:
  71. """
  72. today_str = datetime.today().strftime("%Y-%m-%d")
  73. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  74. today_timestamp = today_date_time.timestamp()
  75. sql = f"""
  76. select updateTime
  77. from official_articles_v2
  78. where ghId = '{gh_id}'
  79. order by updateTime
  80. desc
  81. """
  82. latest_update_time = cls.db_client.select(sql)[0][0]
  83. # 判断该账号当天发布的文章是否被收集
  84. if int(latest_update_time) > int(today_timestamp):
  85. return True
  86. else:
  87. return False
  88. @classmethod
  89. def updateJob(cls):
  90. """
  91. 更新文章任务
  92. :return:
  93. """
  94. account_dict = cls.getAccountIdDict()
  95. account_list = list(account_dict.keys())
  96. for account_id in tqdm(account_list):
  97. try:
  98. update_articles(gh_id=account_id)
  99. except Exception as e:
  100. response = {
  101. "code": 1001,
  102. "info": "单个账号更新失败",
  103. "error": str(e),
  104. "time_stamp": datetime.now().__str__(),
  105. }
  106. print(response)
  107. @classmethod
  108. def checkJob(cls):
  109. """
  110. 验证所有账号是否已经有更新数据
  111. :return:
  112. todo: 被封禁账号&&服务号需要做区分
  113. """
  114. account_dict = cls.getAccountIdDict()
  115. error_account_list = []
  116. for account_id in tqdm(account_dict):
  117. if cls.checkEachAccount(account_id):
  118. continue
  119. else:
  120. name = account_dict[account_id]
  121. error_account_list.append(name)
  122. if error_account_list:
  123. try:
  124. cls.bot(error_account_list)
  125. except Exception as e:
  126. print("Timeout Error: {}".format(e))
  127. def job_with_thread(job_func):
  128. """
  129. 每个任务放到单个线程中
  130. :param job_func:
  131. :return:
  132. """
  133. job_thread = threading.Thread(target=job_func)
  134. job_thread.start()
  135. if __name__ == "__main__":
  136. UMD = UpdateMsgDaily()
  137. try:
  138. schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob)
  139. except Exception as error:
  140. UMD.bot(account_list=["更新文章定时任务异常终止", str(error)])
  141. try:
  142. schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob)
  143. except Exception as error:
  144. UMD.bot(account_list=['校验账号任务异常终止', str(error)])
  145. while True:
  146. schedule.run_pending()
  147. time.sleep(1)