automation_provide_job_monitor.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from datetime import datetime, time
  2. from typing import List, Tuple
  3. from aliyun.log import LogClient
  4. from aliyun.log.auth import AUTH_VERSION_4
  5. from util import feishu_inform_util
  6. endpoint = "cn-hangzhou.log.aliyuncs.com"
  7. access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  8. access_key_id = "LTAIWYUujJAm7CbH"
  9. project = "crawler-scheduler"
  10. log_store = "aigc-provider"
  11. state_query_sql = "* | select crawlerMode, result, if(reason='null', '成功', reason) as reason, count(distinct videoId) as videoIdCnt, count(distinct crawlerPlanId) as crawlerPlanIdCnt from log where reason not in ('该账号已经存在爬取计划,跳过执行', '该视频近期已经处理过', '该Topic已经创建过爬取计划', '该关键词已经创建过爬取计划') group by crawlerMode, result, reason order by crawlerMode, result desc, reason"
  12. client = LogClient(endpoint=endpoint, accessKey=access_key, accessKeyId=access_key_id, auth_version=AUTH_VERSION_4, region='cn-hangzhou')
  13. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9'
  14. all_crawler_mode_list = [
  15. "account", "account_extend", "channel_topic", "channel_topic_extend",
  16. "channel_image_search_video", "channel_image_search_topic", "channel_image_search_topic_extend"
  17. ]
  18. card_json = {
  19. "schema": "2.0",
  20. "header": {
  21. "title": {
  22. "tag": "plain_text",
  23. "content": "【自动化供给】日任务执行情况监控"
  24. },
  25. "template": "blue"
  26. },
  27. "body": {
  28. "elements": []
  29. }
  30. }
  31. def gen_collapsible_panel_json(title, content, is_parent: bool = True) -> dict:
  32. return {
  33. "tag": "collapsible_panel",
  34. "expanded": False,
  35. "header": {
  36. "title": {
  37. "tag": "plain_text",
  38. "content": title
  39. },
  40. "vertical_align": "center",
  41. },
  42. "border": {
  43. "color": "grey",
  44. "corner_radius": "5px"
  45. },
  46. "elements": [
  47. {
  48. "tag": "markdown",
  49. "content": content
  50. }
  51. ]
  52. }
  53. def job_run_state(start_ts: int, end_ts: int):
  54. """
  55. 任务运行情况统计
  56. """
  57. resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=state_query_sql)
  58. log_data = resp.get_body().get('data')
  59. collapsible_limit = 5
  60. crawler_mode_group = [all_crawler_mode_list[i:i + collapsible_limit] for i in range(0, len(all_crawler_mode_list), collapsible_limit)]
  61. for crawler_mode_partition in crawler_mode_group:
  62. elements = []
  63. for crawler_mode in crawler_mode_partition:
  64. content = "| reason | videoIdCnt | crawlerPlanIdCnt |\n"
  65. content += "| --- | --- | --- |\n"
  66. for datum in log_data:
  67. if crawler_mode != datum.get('crawlerMode'):
  68. continue
  69. reason = datum.get('reason')
  70. video_id_cnt = datum.get('videoIdCnt')
  71. crawler_plan_id_cnt = datum.get('crawlerPlanIdCnt')
  72. content += f"| {reason} | {video_id_cnt} | {crawler_plan_id_cnt} |\n"
  73. elements.append(gen_collapsible_panel_json(crawler_mode, content))
  74. new_card_json = {**card_json, **{}}
  75. new_card_json["body"]["elements"] = elements
  76. feishu_inform_util.send_card_msg_to_feishu(webhook, new_card_json)
  77. def crawler_mode_not_success_warning(start_ts: int, end_ts: int, crawler_mode_and_video_source_list: List[Tuple[str, str]]):
  78. for crawler_mode, video_source in crawler_mode_and_video_source_list:
  79. query_sql = f"crawlerMode : {crawler_mode} and videoSource : {video_source} and result : true | select count(1) as cnt from log"
  80. resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=query_sql)
  81. success_cnt = int(resp.get_body().get('data')[0]['cnt'])
  82. if success_cnt <= 0:
  83. msg = f"- 供给方式: {crawler_mode} \n- 视频来源: {video_source} \n- 当天还没有成功执行的任务,请关注"
  84. new_card_json = {**card_json, **{}}
  85. new_card_json['header']['template'] = 'red'
  86. new_card_json['body']['elements'] = [{
  87. "tag": "markdown",
  88. "content": msg
  89. }]
  90. feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
  91. def main():
  92. # 获取当前日期
  93. today = datetime.now()
  94. # 转换为时间戳(秒级)
  95. # 当天开始时间(00:00:00)
  96. start_ts = int(datetime.combine(today.date(), time.min).timestamp())
  97. # 当天结束时间(23:59:59.999999)
  98. end_ts = int(datetime.combine(today.date(), time.max).timestamp())
  99. job_run_state(start_ts, end_ts)
  100. # 历史爆款
  101. video_source_list = ["history"]
  102. history_crawler_mode_list = ["account_extend", "channel_topic", "channel_topic_extend", "channel_image_search_video", "channel_image_search_topic",
  103. "channel_image_search_topic_extend"]
  104. # 九点半之后统计每日爆款
  105. if today.hour >= 9 and today.minute >= 30:
  106. video_source_list.append("top")
  107. crawler_mode_and_video_source_list = []
  108. for crawler_mode in all_crawler_mode_list:
  109. for video_source in video_source_list:
  110. if video_source == "history":
  111. if crawler_mode not in history_crawler_mode_list:
  112. continue
  113. crawler_mode_and_video_source_list.append((crawler_mode, video_source))
  114. else:
  115. crawler_mode_and_video_source_list.append((crawler_mode, video_source))
  116. crawler_mode_not_success_warning(start_ts, end_ts, crawler_mode_and_video_source_list)
  117. if __name__ == "__main__":
  118. main()