| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- """
- Prometheus metrics 导出(可选功能)
- 需要安装:pip install prometheus-client
- """
- from pathlib import Path
- import pandas as pd
- import logging
- logger = logging.getLogger(__name__)
- try:
- from prometheus_client import Counter, Gauge, Histogram, write_to_textfile
- # 定义指标
- DECISIONS_TOTAL = Counter(
- 'ad_decisions_total',
- 'Total decisions made',
- ['action', 'account_id']
- )
- EXECUTION_DURATION = Histogram(
- 'ad_execution_duration_seconds',
- 'Execution duration in seconds'
- )
- APPROVAL_TIMEOUT = Counter(
- 'ad_approval_timeout_total',
- 'Approval timeouts'
- )
- ACTIVE_ADS = Gauge(
- 'ad_active_ads_count',
- 'Number of active ads'
- )
- PROMETHEUS_ENABLED = True
- except ImportError:
- logger.warning("prometheus_client 未安装,Prometheus metrics 功能不可用")
- PROMETHEUS_ENABLED = False
- def export_metrics():
- """从最近的决策报告导出指标到 Prometheus"""
- if not PROMETHEUS_ENABLED:
- logger.debug("Prometheus metrics 未启用,跳过导出")
- return
- try:
- reports_dir = Path("/app/outputs/reports")
- if not reports_dir.exists():
- logger.warning("报告目录不存在,跳过 metrics 导出")
- return
- # 查找最新的决策报告
- latest_csv = sorted(reports_dir.glob("llm_decisions_*.csv"), reverse=True)
- if not latest_csv:
- logger.warning("未找到决策报告,跳过 metrics 导出")
- return
- latest_csv = latest_csv[0]
- df = pd.read_csv(latest_csv)
- # 统计决策分布
- for _, row in df.iterrows():
- DECISIONS_TOTAL.labels(
- action=row.get('action', 'unknown'),
- account_id=str(row.get('account_id', 'unknown'))
- ).inc()
- # 统计活跃广告数
- if 'configured_status' in df.columns:
- active_count = len(df[df['configured_status'] == 'AD_STATUS_NORMAL'])
- ACTIVE_ADS.set(active_count)
- # 写入文件(供 Prometheus file_sd 抓取)
- metrics_file = Path('/app/outputs/metrics.prom')
- write_to_textfile(str(metrics_file), DECISIONS_TOTAL)
- logger.info(f"Prometheus metrics 已导出到 {metrics_file}")
- except Exception as e:
- logger.error(f"导出 Prometheus metrics 失败: {e}", exc_info=True)
- def record_execution_duration(duration_seconds: float):
- """记录执行耗时"""
- if PROMETHEUS_ENABLED:
- EXECUTION_DURATION.observe(duration_seconds)
- def record_approval_timeout():
- """记录审批超时事件"""
- if PROMETHEUS_ENABLED:
- APPROVAL_TIMEOUT.inc()
|