automation_provide_job_monitor.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. from datetime import datetime, time
  2. from typing import List, Tuple
  3. from aliyun.log import LogClient
  4. from aliyun.log.auth import AUTH_VERSION_4
  5. from enums.automation_job import AutomationJobCronInfo
  6. from util import feishu_inform_util
  7. endpoint = "cn-hangzhou.log.aliyuncs.com"
  8. access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  9. access_key_id = "LTAIWYUujJAm7CbH"
  10. project = "crawler-scheduler"
  11. log_store = "aigc-provider"
  12. state_query_sql = "* | select crawlerMode, result, if(reason='null', '成功', reason) as reason, count(distinct videoId) as videoIdCnt, count(distinct crawlerPlanId) as crawlerPlanIdCnt from log where reason not in ('该账号已经存在爬取计划,跳过执行', '该视频近期已经处理过', '该Topic已经创建过爬取计划', '该关键词已经创建过爬取计划') group by crawlerMode, result, reason order by crawlerMode, result desc, reason"
  13. client = LogClient(endpoint=endpoint, accessKey=access_key, accessKeyId=access_key_id, auth_version=AUTH_VERSION_4, region='cn-hangzhou')
  14. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9'
  15. card_json = {
  16. "schema": "2.0",
  17. "header": {
  18. "title": {
  19. "tag": "plain_text",
  20. "content": "【自动化供给】日任务执行情况监控"
  21. },
  22. "template": "blue"
  23. },
  24. "body": {
  25. "elements": []
  26. }
  27. }
  28. def gen_collapsible_panel_json(title, content, is_parent: bool = True) -> dict:
  29. return {
  30. "tag": "collapsible_panel",
  31. "expanded": False,
  32. "header": {
  33. "title": {
  34. "tag": "plain_text",
  35. "content": title
  36. },
  37. "vertical_align": "center",
  38. },
  39. "border": {
  40. "color": "grey",
  41. "corner_radius": "5px"
  42. },
  43. "elements": [
  44. {
  45. "tag": "markdown",
  46. "content": content
  47. }
  48. ]
  49. }
  50. def job_run_state(start_ts: int, end_ts: int):
  51. """
  52. 任务运行情况统计
  53. """
  54. resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=state_query_sql)
  55. log_data = resp.get_body().get('data')
  56. all_crawler_mode = list(dict.fromkeys([cron_info.crawler_mode for cron_info in AutomationJobCronInfo]))
  57. collapsible_limit = 5
  58. crawler_mode_group = [all_crawler_mode[i:i + collapsible_limit] for i in range(0, len(all_crawler_mode), collapsible_limit)]
  59. for crawler_mode_partition in crawler_mode_group:
  60. elements = []
  61. for crawler_mode in crawler_mode_partition:
  62. content = "| reason | videoIdCnt | crawlerPlanIdCnt |\n"
  63. content += "| --- | --- | --- |\n"
  64. for datum in log_data:
  65. if crawler_mode != datum.get('crawlerMode'):
  66. continue
  67. reason = datum.get('reason')
  68. video_id_cnt = datum.get('videoIdCnt')
  69. crawler_plan_id_cnt = datum.get('crawlerPlanIdCnt')
  70. content += f"| {reason} | {video_id_cnt} | {crawler_plan_id_cnt} |\n"
  71. elements.append(gen_collapsible_panel_json(crawler_mode, content))
  72. new_card_json = {**card_json, **{}}
  73. new_card_json["body"]["elements"] = elements
  74. feishu_inform_util.send_card_msg_to_feishu(webhook, new_card_json)
  75. def crawler_mode_not_success_warning(start_ts: int, end_ts: int, crawler_mode_and_video_source_list: List[Tuple[str, str]]):
  76. for crawler_mode, video_source in crawler_mode_and_video_source_list:
  77. query_sql = f"crawlerMode : {crawler_mode} and videoSource : {video_source} and result : true | select count(1) as cnt from log"
  78. resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=query_sql)
  79. success_cnt = int(resp.get_body().get('data')[0]['cnt'])
  80. if success_cnt <= 0:
  81. msg = f"- 供给方式: {crawler_mode} \n- 视频来源: {video_source} \n- 当天还没有成功执行的任务,请关注"
  82. new_card_json = {**card_json, **{}}
  83. new_card_json['header']['template'] = 'red'
  84. new_card_json['body']['elements'] = [{
  85. "tag": "markdown",
  86. "content": msg
  87. }]
  88. feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
  89. def main():
  90. # 获取当前日期
  91. today = datetime.now()
  92. # 转换为时间戳(秒级)
  93. # 当天开始时间(00:00:00)
  94. start_ts = int(datetime.combine(today.date(), time.min).timestamp())
  95. # 当天结束时间(23:59:59.999999)
  96. end_ts = int(datetime.combine(today.date(), time.max).timestamp())
  97. job_run_state(start_ts, end_ts)
  98. current_hour = today.hour
  99. crawler_mode_and_video_source_list = []
  100. for cron_info in AutomationJobCronInfo:
  101. if current_hour < cron_info.task_start_hour:
  102. continue
  103. crawler_mode_and_video_source_list.append((cron_info.crawler_mode, cron_info.video_source))
  104. crawler_mode_not_success_warning(start_ts, end_ts, crawler_mode_and_video_source_list)
  105. if __name__ == "__main__":
  106. main()