from flask import Flask,Response from DBSession import session_maker from model import * from sqlalchemy.sql import func from utils import * import atexit from aliyunsdkcore.client import AcsClient from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest from apscheduler.schedulers.background import BackgroundScheduler import json from model_longvideo import produce_video_task,produce_video_project from DBSession_longvideo import session_maker_longvideo from prometheus_client import Gauge,Counter, generate_latest from prometheus_client.core import CollectorRegistry from scheduler_jobs import interface_info_count from ex_response import ex_response import time import threading import logging logging.basicConfig(level=logging.INFO) app = Flask(__name__) registry = CollectorRegistry(auto_describe=False) # 定义后台执行调度器 scheduler = BackgroundScheduler() scheduler.add_job(func=update_request_url_list, trigger="interval", seconds=300) scheduler.add_job(produce_video_ratio_cnt, 'cron', hour='00', minute='03') scheduler.start() atexit.register(lambda: scheduler.shutdown()) client = AcsClient('LTAI4GBWbFvvXoXsSVBe1o9f', 'kRAikWitb4kDxaAyBqNrmLmllMEDO3', 'cn-hangzhou') healthcheck_status = Gauge("healthcheck_status", "ipaddress", ['server_name', 'ipaddress'], registry=registry) url_http_avgtime = Gauge("url_http_times_avgs", "url of avgs", ['appType', 'url'], registry=registry) url_http_qps = Gauge("url_http_qps", "url of qps", ['appType','url'], registry=registry) url_http_expendtime_summary = Gauge("url_http_expendtime_summary", "expendtime summary", ['appType', 'url', 'duration'], registry=registry) url_http_error_code_cnt = Gauge("url_http_error_code_cnt", "error code", ['appType', 'error_code'], registry=registry) probe_http_status_code = Gauge("http_status_code", 'h5',['server_name'], registry=registry) probe_http_total_time = Gauge("http_total_time", 'h5',['server_name'], registry=registry) probe_http_dns_time = Gauge("http_dns_time", 'h5',['server_name'], registry=registry) probe_http_connect_time = Gauge("http_connect_time", 'h5',['server_name'], registry=registry) probe_http_pretransfer_time = Gauge("http_pretransfer_time", 'h5',['server_name'], registry=registry) probe_http_first_byte_time = Gauge("http_first_byte_time", 'h5',['server_name'], registry=registry) slb_http_status_code = Gauge("slb_http_status_code", 'slb', ['server_name', 'status'], registry=registry) #视频合成 produce_video_task_count = Gauge("produce_video_task_count", 'status', ['status'], registry=registry) produce_video_task_rate = Gauge("produce_video_task_rate", 'rate', ['rate'], registry=registry) produce_video_task_total = Gauge("produce_video_task_total", 'total', ['total_cnt'], registry=registry) produce_video_task_sucess = Gauge("produce_video_task_sucess", 'sucess', ['sucess_cnt'], registry=registry) produce_video_task_ratio = Gauge("produce_video_task_ratio", 'produce_video_ratio', ['ratio'], registry=registry) produce_video_tts_count = Gauge("produce_video_tts_count", 'success', ['tts_channel'], registry=registry) produce_video_tts_ratio = Gauge("produce_video_tts_ratio", 'ratio', ['channel'], registry=registry) logs_app_recommend_log_cnt_300 = Gauge("logs_app_recommend_log_null_cnt_300", "null cnt", ['cnt'], registry=registry) logs_app_recommend_log_cnt_all = Counter("logs_app_recommend_log_null_cnt_all", "all cnt", ['cnt'], registry=registry) @app.route('/update') def update(): request = DescribeInstancesRequest() request.set_accept_format('json') request.set_PageSize(100) # request.set_InstanceNetworkType("vpc") request.set_Tags([ { "Key": "ecs" } ]) response = client.do_action_with_exception(request) instance_info = json.loads(response) intances_list_del() count = len(instance_info["Instances"]["Instance"]) for i in range(len(instance_info["Instances"]["Instance"])): instance_id = instance_info["Instances"]["Instance"][i]["InstanceId"] if instance_info["Instances"]["Instance"][i]["InstanceNetworkType"] == "vpc": ipaddr = instance_info["Instances"]["Instance"][i]["VpcAttributes"]["PrivateIpAddress"]["IpAddress"][0] if instance_info["Instances"]["Instance"][i]["InstanceNetworkType"] == "classic": ipaddr = instance_info["Instances"]["Instance"][i]["InnerIpAddress"]["IpAddress"][0] server_name = instance_info["Instances"]["Instance"][i]["Tags"]["Tag"][0]["TagValue"] status = instance_info["Instances"]["Instance"][i]["Status"] instance_name = instance_info["Instances"]["Instance"][i]["HostName"] if status == "Running": status = 1 instance_insert(instance_id, ipaddr, instance_name, server_name, status) return "更新完成" @app.route('/app/healthcheck/metrics') def app_healthcheck(): threads = [] with session_maker() as session: instance_infos = session.query(InstanceList).filter(InstanceList.server_name=="longvideoapi.prod").all() for index in range(len(instance_infos)): ipaddr = instance_infos[index].ipadd server_name = instance_infos[index].server_name http_code = healthcheck(ipaddr, server_name) instance_id = instance_infos[index].instance_id healthcheck_status.labels(instance_id, server_name, ipaddr).set(http_code) return Response(generate_latest(registry),mimetype="text/plain") @app.route('/app/qps/metrics') def qps_avgtime_count(): threads = [] with session_maker() as session: intface_infos = session.query(IntfaceList.interface_url).all() app_type = session.query(app_info.app_type).all() for i in range(len(intface_infos)): for index in range(len(app_type)): url = intface_infos[i].interface_url appType = app_type[index].app_type url_avgtime = count_avg_time(appType, url) url_qps = count_qps(appType, url) url_http_avgtime.labels(appType, url).set(url_avgtime) url_http_qps.labels(appType, url).set(url_qps) return Response(generate_latest(registry),mimetype="text/plain") @app.route('/h5/metrics') def h5_healthcheck(): # curl_respon = ex_response( share_h5 = "share_h5" download_h5 = "download_h5" share_h5_url = "https://longvideoh5.piaoquantv.com/core/share?shareSource=customerMessage&fromAppType=0&qrAppType=0&versionCode=321&shareUid=12463024&shareMachineCode=weixin_openid_o0w175fPwp8yrtOGihYJhvnT9Ag4&h5WxrootPageSource=vlog-pages___category&videoId=2689415&isRecommendShare=1&h5ShareId=backend493cd67dd28f4ee395781d59881567211625976055926&shareDepth=0&state=#" download_h5_url = "https://longvideoh5.piaoquantv.com/dist_1_3_4/upload?accessToken=fe8914eb2e99d1fe8ddaa2f753f5ec613eb2dfbb&versionCode=323&galleryId=0&fileType=2&machineCode=weixin_openid_o0w175fPwp8yrtOGihYJhvnT9Ag4&platform=devtools&system=iOS%2010.0.1&appType=0&appId=wx89e7eb06478361d7&pageSource=vlog-pages%2Fwebview&loginUid=12463024&machineInfo=%7B%22sdkVersion%22%3A%222.4.1%22,%22brand%22%3A%22devtools%22,%22language%22%3A%22zh_CN%22,%22model%22%3A%22iPhone%20X%22,%22platform%22%3A%22devtools%22,%22system%22%3A%22iOS%2010.0.1%22,%22weChatVersion%22%3A%228.0.5%22,%22screenHeight%22%3A812,%22screenWidth%22%3A375,%22windowHeight%22%3A730,%22windowWidth%22%3A375,%22softVersion%22%3A%224.1.168%22%7D&wxHelpPagePath=%2Fpackage-my%2Fhelp-feedback%2Fhelp-feedback&transaction=2065ff98-6f27-4f09-c9eb-d366c99dd5d5&videoBarrageSwitch=true&addMusic=1&eventId=0&fromActivityId=0&sessionId=1626833289618-583a312d-81cd-62f9-cdd4-cf914c682d55&subSessionId=1626833289618-583a312d-81cd-62f9-cdd4-cf914c682d55&projectId=&entranceType=#wechat_redirec" shar_h5_curl_response = ex_response(share_h5_url) share_h5_url_info = shar_h5_curl_response.getinfo() download_h5_curl_response = ex_response(download_h5_url) download_h5_url_info = download_h5_curl_response.getinfo() probe_http_status_code.labels(share_h5).set(share_h5_url_info["http_code"]) probe_http_status_code.labels(download_h5).set(download_h5_url_info["http_code"]) probe_http_total_time.labels("share_h5").set(share_h5_url_info["total_time"]*1000) probe_http_total_time.labels("download_h5").set(download_h5_url_info["total_time"]*1000) probe_http_dns_time.labels("share_h5").set(share_h5_url_info["dns_time"]*1000) probe_http_dns_time.labels("download_h5").set(download_h5_url_info["dns_time"]*1000) probe_http_connect_time.labels("share_h5").set(share_h5_url_info["dns_time"]*1000) probe_http_connect_time.labels("download_h5").set(download_h5_url_info["dns_time"]*1000) probe_http_pretransfer_time.labels("share_h5").set(share_h5_url_info["pretransfer_time"]*1000) probe_http_pretransfer_time.labels("download_h5").set(download_h5_url_info["pretransfer_time"]*1000) probe_http_first_byte_time.labels("share_h5").set(share_h5_url_info["first_byte_time"]*1000) probe_http_first_byte_time.labels("download_h5").set(download_h5_url_info["first_byte_time"]*1000) return Response(generate_latest(registry), mimetype="text/plain") @app.route('/slbStatusCode/metrics') def slb_request_status_metric(): svc_name = {'longvideoapi', 'clip', 'speed'} for name in svc_name: res = slb_status_code_count(name) if res: for i in range(len(res)): status = res[i]['status'] cnt = float(res[i]['cnt']) slb_http_status_code.labels(name, status).set(cnt) return Response(generate_latest(registry), mimetype="text/plain") @app.route('/metrics') def all_metric(): """视频合成类metrics""" """视频合成成功率""" current_app.logger.info("测试") time_stamp = int(time.time()) end_time = int(datetime.datetime.fromtimestamp(time_stamp).strftime('%Y%m%d%H%M%S')) * 1000000000 start_time = int(datetime.datetime.fromtimestamp(time_stamp-300).strftime('%Y%m%d%H%M%S')) * 1000000000 print(start_time, end_time) query_sql = ("select totalCount , (successCount+processingCount1), round((successCount + processingCount1)/totalCount * 100,2), processingCount ,processingCount1, processingCount2 from " "(select count(*) as totalCount," "sum(case when produce_status in (5,6,7,8) then 1 else 0 end) as successCount," "sum(case when produce_status = 99 then 1 else 0 end) as failCount , " "sum(case when produce_status in(0,1,2,3,4) then 1 else 0 end) as processingCount," "sum(case when produce_status in(0,1,2,3,4) and (rate < 0.7 or rate is null) then 1 else 0 end) processingCount1," "sum(case when produce_status in(0,1,2,3,4) and rate >= 0.7 then 1 else 0 end) processingCount2 from " "(select t1.project_id, t1.produce_status, round((t2.last_connect_timestamp - t1.submit_timestamp) / (t1.video_duration/1000), 1) as rate from produce_video_project t1 " "left join produce_video_project_connect_time t2 on t1.project_id = t2.project_id " "where t1.project_id > %s and t1.project_id < %s and t1.app_type not in (1,13,15)) s1) ss1" %(start_time ,end_time) ) res = db_query(query_sql) if res[0] is not None: total_cnt = res[0] else: total_cnt = 0 if res[1] is not None: success_cnt = res[1] else: success_cnt = 0 if total_cnt == 0: rate = 100 else: rate = round((success_cnt/total_cnt), 2)*100 if res[3] is not None: process_cnt = res[3] else: process_cnt = 0 if res[4] is not None: process_cnt_1 = res[4] else: process_cnt_1 = 0 if res[5] is not None: process_cnt_2 = res[5] else: process_cnt_2 = 0 produce_video_task_rate.labels("produce_video_task_rate").set(rate) produce_video_task_total.labels('total_cnt').set(total_cnt) produce_video_task_sucess.labels('success_cnt').set(success_cnt) produce_video_task_count.labels('process_cnt').set(process_cnt) produce_video_task_count.labels('process_cnt<0.7').set(process_cnt_1) produce_video_task_count.labels('process_cnt>0.7').set(process_cnt_2) res = produce_video_task_cnt() if res: if res[0]: total_cnt = res[0] produce_video_task_count.labels("total").set(total_cnt) if res[1]: success_cnt = res[1] produce_video_task_count.labels("success").set(success_cnt) if res[2]: fail_cnt = res[2] produce_video_task_count.labels("fail").set(fail_cnt) res = produce_video_ratio_cnt() if res[1]: r1 = res[1] else: r1 = 0 produce_video_task_ratio.labels("<0.5").set(r1) if res[2]: r2 = res[2] else: r2 = 0 produce_video_task_ratio.labels("0.5-0.7").set(r2) if res[3]: r3 = res[3] else: r3 = 0 produce_video_task_ratio.labels("0.7-1.0").set(r3) if res[4]: r4 = res[4] else: r4 = 0 produce_video_task_ratio.labels("1.0-1.5").set(r4) if res[5]: r5 = res[5] else: r5 = 0 produce_video_task_ratio.labels("1.5-2.0").set(r5) if res[6]: r6 = res[6] else: r6 = 0 produce_video_task_ratio.labels(">2.0").set(r6) # sql = "select v1 as 平均合成耗时,v2 as 平均视频时长, round(v2/v1,1) as 时长耗时比 from (select avg(produce_done_timestamp - submit_timestamp) as v1, avg(video_duration/1000) as v2 from produce_video_project where project_id > {} and project_id < {} and app_type not in (13,15) and produce_status in (5,6,7,8)) as t1".format(start_time, end_time) # #tts # res = db_query(sql) # if res[2] is not None: # produce_video_task_count.labels("video_progress").set(res[2]) # else: # produce_video_task_count.labels("video_progress").set(0) res = logs_tts_count("aliyun",1) if res is not None: aliyun_success = int(res[0]["count"]) else: aliyun_success = 0 produce_video_tts_count.labels("aliyun_success").set(aliyun_success) res = logs_tts_count("aliyun", 0) if res is not None: aliyun_fail = int(res[0]["count"]) else: aliyun_fail = 0 produce_video_tts_count.labels("aliyun_fail").set(aliyun_fail) res = logs_tts_count("azure", 1) if res is not None: azure_success = int(res[0]["count"]) else: azure_success = 0 produce_video_tts_count.labels("azure_success").set(azure_success) res = logs_tts_count("azure", 0) if res is not None: azure_fail = int(res[0]["count"]) else: azure_fail = 0 produce_video_tts_count.labels("azure_fail").set(azure_fail) """5m计算""" res = logs_tts_ratio("aliyun", 1) if res is not None: aliyun_success = int(res[0]["count"]) else: aliyun_success = 0 produce_video_tts_count.labels("aliyun_success_5m").set(aliyun_success) res = logs_tts_ratio("aliyun", 0) if res is not None: aliyun_fail = int(res[0]["count"]) else: aliyun_fail = 0 produce_video_tts_count.labels("aliyun_fail_5m").set(aliyun_fail) res = logs_tts_ratio("azure", 1) if res is not None: azure_success = int(res[0]["count"]) else: azure_success = 0 produce_video_tts_count.labels("azure_success_5m").set(azure_success) res = logs_tts_ratio("azure", 0) if res is not None: azure_fail = int(res[0]["count"]) else: azure_fail = 0 produce_video_tts_count.labels("azure_fail_5m").set(azure_fail) aliyun_cnt = aliyun_success + aliyun_fail azure_cnt = azure_success + azure_fail aliyun_ratio = round((aliyun_success/aliyun_cnt)*100,4) azure_ratio = round((azure_success/azure_cnt)*100, 4) produce_video_tts_ratio.labels("aliyun").set(aliyun_ratio) produce_video_tts_ratio.labels("azure").set(azure_ratio) #当日负载均衡http_code svc_name = {'longvideoapi', 'clip', 'speed', 'commonapi'} for name in svc_name: res = slb_status_code_count(name) if res: for i in range(len(res)): status = res[i]['status'] cnt = float(res[i]['cnt']) slb_http_status_code.labels(name, status).set(cnt) #h5 healthcheck share_h5 = "share_h5" download_h5 = "download_h5" share_h5_url = "https://longvideoh5.piaoquantv.com/core/share?shareSource=customerMessage&fromAppType=0&qrAppType=0&versionCode=321&shareUid=12463024&shareMachineCode=weixin_openid_o0w175fPwp8yrtOGihYJhvnT9Ag4&h5WxrootPageSource=vlog-pages___category&videoId=2689415&isRecommendShare=1&h5ShareId=backend493cd67dd28f4ee395781d59881567211625976055926&shareDepth=0&state=#" download_h5_url = "https://longvideoh5.piaoquantv.com/dist_1_3_4/upload?accessToken=fe8914eb2e99d1fe8ddaa2f753f5ec613eb2dfbb&versionCode=323&galleryId=0&fileType=2&machineCode=weixin_openid_o0w175fPwp8yrtOGihYJhvnT9Ag4&platform=devtools&system=iOS%2010.0.1&appType=0&appId=wx89e7eb06478361d7&pageSource=vlog-pages%2Fwebview&loginUid=12463024&machineInfo=%7B%22sdkVersion%22%3A%222.4.1%22,%22brand%22%3A%22devtools%22,%22language%22%3A%22zh_CN%22,%22model%22%3A%22iPhone%20X%22,%22platform%22%3A%22devtools%22,%22system%22%3A%22iOS%2010.0.1%22,%22weChatVersion%22%3A%228.0.5%22,%22screenHeight%22%3A812,%22screenWidth%22%3A375,%22windowHeight%22%3A730,%22windowWidth%22%3A375,%22softVersion%22%3A%224.1.168%22%7D&wxHelpPagePath=%2Fpackage-my%2Fhelp-feedback%2Fhelp-feedback&transaction=2065ff98-6f27-4f09-c9eb-d366c99dd5d5&videoBarrageSwitch=true&addMusic=1&eventId=0&fromActivityId=0&sessionId=1626833289618-583a312d-81cd-62f9-cdd4-cf914c682d55&subSessionId=1626833289618-583a312d-81cd-62f9-cdd4-cf914c682d55&projectId=&entranceType=#wechat_redirec" shar_h5_curl_response = ex_response(share_h5_url) share_h5_url_info = shar_h5_curl_response.getinfo() download_h5_curl_response = ex_response(download_h5_url) download_h5_url_info = download_h5_curl_response.getinfo() probe_http_status_code.labels(share_h5).set(share_h5_url_info["http_code"]) probe_http_status_code.labels(download_h5).set(download_h5_url_info["http_code"]) probe_http_total_time.labels("share_h5").set(share_h5_url_info["total_time"]*1000) probe_http_total_time.labels("download_h5").set(download_h5_url_info["total_time"]*1000) probe_http_dns_time.labels("share_h5").set(share_h5_url_info["dns_time"]*1000) probe_http_dns_time.labels("download_h5").set(download_h5_url_info["dns_time"]*1000) probe_http_connect_time.labels("share_h5").set(share_h5_url_info["dns_time"]*1000) probe_http_connect_time.labels("download_h5").set(download_h5_url_info["dns_time"]*1000) probe_http_pretransfer_time.labels("share_h5").set(share_h5_url_info["pretransfer_time"]*1000) probe_http_pretransfer_time.labels("download_h5").set(download_h5_url_info["pretransfer_time"]*1000) probe_http_first_byte_time.labels("share_h5").set(share_h5_url_info["first_byte_time"]*1000) probe_http_first_byte_time.labels("download_h5").set(download_h5_url_info["first_byte_time"]*1000) #推荐业务 res_null_cnt = count_recommend_null() logs_app_recommend_log_cnt_300.labels("recommend").set(res_null_cnt) # # logs_app_recommend_log_cnt_all.labels("recommend").inc(1) # logs_app_recommend_log_cnt_all.inc(res_null_cnt) # """ 所有服务health_check """ return Response(generate_latest(registry), mimetype="text/plain") @app.route('/qps_avgtime/metrics') def qps_avgtime_metrics(): """统计接口QPM, RT时间,RT时间分布 """ res, url = app_openapi_qps_avgtime_count() qps = int(res.body[0]["cnt"]) avgtime = int(float(res.body[0]["avg_time"])) app_type_temp = 1 url_http_qps.labels(app_type_temp, url).set(qps) url_http_avgtime.labels(app_type_temp, url).set(avgtime) app_type = ['1', '0', '4', '5', '6', '12', '13', '15'] for i in range(len(app_type)): appType = app_type[i] res = count_qps_avgtime(appType) for i in range(len(res.body)): url = res.body[i]["requestUri"] qps = int(res.body[i]["cnt"]) avgtime = int(float(res.body[i]["avg_time"])) url_http_qps.labels(appType, url).set(qps) url_http_avgtime.labels(appType, url).set(avgtime) res = count_rt_less_time_count(appType, 0, 200) for i in range(len(res.body)): url = res.body[i]["requestUri"] count = res.body[i]["cnt"] url_http_expendtime_summary.labels(appType, url, "0-200").set(count) res = count_rt_less_time_count(appType, 200, 500) for i in range(len(res.body)): url = res.body[i]["requestUri"] count = res.body[i]["cnt"] url_http_expendtime_summary.labels(appType, url, "200-300").set(count) res = count_rt_less_time_count(appType, 500, 1000) for i in range(len(res.body)): url = res.body[i]["requestUri"] count = res.body[i]["cnt"] url_http_expendtime_summary.labels(appType, url, "500-1000").set(count) res = count_rt_less_time_count(appType, 1000, 10000) for i in range(len(res.body)): url = res.body[i]["requestUri"] count = res.body[i]["cnt"] url_http_expendtime_summary.labels(appType, url, ">1000").set(count) """接口error统计""" res = error_cnt(appType) for i in range(len(res.body)): cnt = res.body[i]["cnt"] error_code = res.body[i]["resultCode"] url_http_error_code_cnt.labels(appType,error_code).set(cnt) return Response(generate_latest(registry), mimetype="text/plain") @app.route('/svc/healthcheck') def svc_healthcheck(): sql = "select ipadd ,server_name from instance_list where status=1" res = db_query_devlop(sql) if res is not None: for i in range(len(res)): ipaddr = res[i][0] svc_name = res[i][1] print(ipaddr, svc_name) http_code = healthcheck(ipaddr, svc_name) print(http_code) healthcheck_status.labels(svc_name, ipaddr ).set(http_code) return Response(generate_latest(registry), mimetype="text/plain") def produce_video_data(): start_time = (int(time.strftime("%Y%m%d%H%M", time.localtime())) - 30) * 100000000000 end_time = (int(time.strftime("%Y%m%d%H%M", time.localtime()))) * 100000000000 sum_sql = ("select count(*) as totalCount from" "(select t1.project_id, t1.produce_status " "from produce_video_project t1 " "left join produce_video_project_connect_time t2 on t1.project_id = t2.project_id " " where t1.project_id > %s and t1.project_id < %s and t1.app_type not in (1,13,15)) s1) ss1"%(start_time,end_time) ) if __name__ == '__main__': #app.run() app.run(host='192.168.201.1', port=9091) app.run()