| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- from datetime import datetime, time
- from typing import List, Tuple
- from aliyun.log import LogClient
- from aliyun.log.auth import AUTH_VERSION_4
- from util import feishu_inform_util
- endpoint = "cn-hangzhou.log.aliyuncs.com"
- access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- access_key_id = "LTAIWYUujJAm7CbH"
- project = "crawler-scheduler"
- log_store = "aigc-provider"
- state_query_sql = "* | select crawlerMode, result, if(reason='null', '成功', reason) as reason, count(distinct videoId) as videoIdCnt, count(distinct crawlerPlanId) as crawlerPlanIdCnt from log where reason not in ('该账号已经存在爬取计划,跳过执行', '该视频近期已经处理过', '该Topic已经创建过爬取计划', '该关键词已经创建过爬取计划') group by crawlerMode, result, reason order by crawlerMode, result desc, reason"
- client = LogClient(endpoint=endpoint, accessKey=access_key, accessKeyId=access_key_id, auth_version=AUTH_VERSION_4, region='cn-hangzhou')
- webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9'
- all_crawler_mode_list = [
- "account", "account_extend", "channel_topic", "channel_topic_extend",
- "channel_image_search_video", "channel_image_search_topic", "channel_image_search_topic_extend"
- ]
- card_json = {
- "schema": "2.0",
- "header": {
- "title": {
- "tag": "plain_text",
- "content": "【自动化供给】日任务执行情况监控"
- },
- "template": "blue"
- },
- "body": {
- "elements": []
- }
- }
- def gen_collapsible_panel_json(title, content, is_parent: bool = True) -> dict:
- return {
- "tag": "collapsible_panel",
- "expanded": False,
- "header": {
- "title": {
- "tag": "plain_text",
- "content": title
- },
- "vertical_align": "center",
- },
- "border": {
- "color": "grey",
- "corner_radius": "5px"
- },
- "elements": [
- {
- "tag": "markdown",
- "content": content
- }
- ]
- }
- def job_run_state(start_ts: int, end_ts: int):
- """
- 任务运行情况统计
- """
- resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=state_query_sql)
- log_data = resp.get_body().get('data')
- collapsible_limit = 5
- crawler_mode_group = [all_crawler_mode_list[i:i + collapsible_limit] for i in range(0, len(all_crawler_mode_list), collapsible_limit)]
- for crawler_mode_partition in crawler_mode_group:
- elements = []
- for crawler_mode in crawler_mode_partition:
- content = "| reason | videoIdCnt | crawlerPlanIdCnt |\n"
- content += "| --- | --- | --- |\n"
- for datum in log_data:
- if crawler_mode != datum.get('crawlerMode'):
- continue
- reason = datum.get('reason')
- video_id_cnt = datum.get('videoIdCnt')
- crawler_plan_id_cnt = datum.get('crawlerPlanIdCnt')
- content += f"| {reason} | {video_id_cnt} | {crawler_plan_id_cnt} |\n"
- elements.append(gen_collapsible_panel_json(crawler_mode, content))
- new_card_json = {**card_json, **{}}
- new_card_json["body"]["elements"] = elements
- feishu_inform_util.send_card_msg_to_feishu(webhook, new_card_json)
- def crawler_mode_not_success_warning(start_ts: int, end_ts: int, crawler_mode_and_video_source_list: List[Tuple[str, str]]):
- for crawler_mode, video_source in crawler_mode_and_video_source_list:
- query_sql = f"crawlerMode : {crawler_mode} and videoSource : {video_source} and result : true | select count(1) as cnt from log"
- resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=query_sql)
- success_cnt = int(resp.get_body().get('data')[0]['cnt'])
- if success_cnt <= 0:
- msg = f"- 供给方式: {crawler_mode} \n- 视频来源: {video_source} \n- 当天还没有成功执行的任务,请关注"
- new_card_json = {**card_json, **{}}
- new_card_json['header']['template'] = 'red'
- new_card_json['body']['elements'] = [{
- "tag": "markdown",
- "content": msg
- }]
- feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
- def main():
- # 获取当前日期
- today = datetime.now()
- # 转换为时间戳(秒级)
- # 当天开始时间(00:00:00)
- start_ts = int(datetime.combine(today.date(), time.min).timestamp())
- # 当天结束时间(23:59:59.999999)
- end_ts = int(datetime.combine(today.date(), time.max).timestamp())
- job_run_state(start_ts, end_ts)
- # 历史爆款
- video_source_list = ["history"]
- history_crawler_mode_list = ["account_extend", "channel_topic", "channel_topic_extend",]
- # 九点半之后统计每日爆款
- if today.hour >= 9 and today.minute >= 30:
- video_source_list.append("top")
- crawler_mode_and_video_source_list = []
- for crawler_mode in all_crawler_mode_list:
- for video_source in video_source_list:
- if video_source == "history":
- if crawler_mode not in history_crawler_mode_list:
- continue
- crawler_mode_and_video_source_list.append((crawler_mode, video_source))
- else:
- crawler_mode_and_video_source_list.append((crawler_mode, video_source))
- crawler_mode_not_success_warning(start_ts, end_ts, crawler_mode_and_video_source_list)
- if __name__ == "__main__":
- main()
|