from datetime import datetime, time from typing import List, Tuple from aliyun.log import LogClient from aliyun.log.auth import AUTH_VERSION_4 from util import feishu_inform_util endpoint = "cn-hangzhou.log.aliyuncs.com" access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" access_key_id = "LTAIWYUujJAm7CbH" project = "crawler-scheduler" log_store = "aigc-provider" state_query_sql = "* | select crawlerMode, result, if(reason='null', '成功', reason) as reason, count(distinct videoId) as videoIdCnt, count(distinct crawlerPlanId) as crawlerPlanIdCnt from log where reason not in ('该账号已经存在爬取计划,跳过执行', '该视频近期已经处理过', '该Topic已经创建过爬取计划', '该关键词已经创建过爬取计划') group by crawlerMode, result, reason order by crawlerMode, result desc, reason" client = LogClient(endpoint=endpoint, accessKey=access_key, accessKeyId=access_key_id, auth_version=AUTH_VERSION_4, region='cn-hangzhou') webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9' all_crawler_mode_list = [ "account", "account_extend", "channel_topic", "channel_topic_extend", "channel_image_search_video", "channel_image_search_topic", "channel_image_search_topic_extend" ] card_json = { "schema": "2.0", "header": { "title": { "tag": "plain_text", "content": "【自动化供给】日任务执行情况监控" }, "template": "blue" }, "body": { "elements": [] } } def gen_collapsible_panel_json(title, content, is_parent: bool = True) -> dict: return { "tag": "collapsible_panel", "expanded": False, "header": { "title": { "tag": "plain_text", "content": title }, "vertical_align": "center", }, "border": { "color": "grey", "corner_radius": "5px" }, "elements": [ { "tag": "markdown", "content": content } ] } def job_run_state(start_ts: int, end_ts: int): """ 任务运行情况统计 """ resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=state_query_sql) log_data = resp.get_body().get('data') collapsible_limit = 5 crawler_mode_group = [all_crawler_mode_list[i:i + collapsible_limit] for i in range(0, len(all_crawler_mode_list), collapsible_limit)] for crawler_mode_partition in crawler_mode_group: elements = [] for crawler_mode in crawler_mode_partition: content = "| reason | videoIdCnt | crawlerPlanIdCnt |\n" content += "| --- | --- | --- |\n" for datum in log_data: if crawler_mode != datum.get('crawlerMode'): continue reason = datum.get('reason') video_id_cnt = datum.get('videoIdCnt') crawler_plan_id_cnt = datum.get('crawlerPlanIdCnt') content += f"| {reason} | {video_id_cnt} | {crawler_plan_id_cnt} |\n" elements.append(gen_collapsible_panel_json(crawler_mode, content)) new_card_json = {**card_json, **{}} new_card_json["body"]["elements"] = elements feishu_inform_util.send_card_msg_to_feishu(webhook, new_card_json) def crawler_mode_not_success_warning(start_ts: int, end_ts: int, crawler_mode_and_video_source_list: List[Tuple[str, str]]): for crawler_mode, video_source in crawler_mode_and_video_source_list: query_sql = f"crawlerMode : {crawler_mode} and videoSource : {video_source} and result : true | select count(1) as cnt from log" resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=query_sql) success_cnt = int(resp.get_body().get('data')[0]['cnt']) if success_cnt <= 0: msg = f"- 供给方式: {crawler_mode} \n- 视频来源: {video_source} \n- 当天还没有成功执行的任务,请关注" new_card_json = {**card_json, **{}} new_card_json['header']['template'] = 'red' new_card_json['body']['elements'] = [{ "tag": "markdown", "content": msg }] feishu_inform_util.send_card_msg_to_feishu(webhook, card_json) def main(): # 获取当前日期 today = datetime.now() # 转换为时间戳(秒级) # 当天开始时间(00:00:00) start_ts = int(datetime.combine(today.date(), time.min).timestamp()) # 当天结束时间(23:59:59.999999) end_ts = int(datetime.combine(today.date(), time.max).timestamp()) job_run_state(start_ts, end_ts) # 历史爆款 video_source_list = ["history"] history_crawler_mode_list = ["account_extend", "channel_topic", "channel_topic_extend",] # 九点半之后统计每日爆款 if today.hour >= 9 and today.minute >= 30: video_source_list.append("top") crawler_mode_and_video_source_list = [] for crawler_mode in all_crawler_mode_list: for video_source in video_source_list: if video_source == "history": if crawler_mode not in history_crawler_mode_list: continue crawler_mode_and_video_source_list.append((crawler_mode, video_source)) else: crawler_mode_and_video_source_list.append((crawler_mode, video_source)) crawler_mode_not_success_warning(start_ts, end_ts, crawler_mode_and_video_source_list) if __name__ == "__main__": main()