package perfmetrics import ( "context" "fmt" "math" "sort" "sync" "time" "github.com/QuantumNous/new-api/common" "github.com/QuantumNous/new-api/model" relaycommon "github.com/QuantumNous/new-api/relay/common" "github.com/QuantumNous/new-api/setting/perf_metrics_setting" ) var hotBuckets sync.Map // seriesSchema is a stable client cache/schema marker. Do not change it when // hiding fields or making response-only privacy hardening changes. const seriesSchema = "dbcd0a3c01b55203" func Init() { go flushLoop() } func RecordRelaySample(info *relaycommon.RelayInfo, success bool, outputTokens int64) { if info == nil { return } now := time.Now() hasTtft := info.IsStream && info.HasSendResponse() ttftMs := int64(0) if hasTtft { ttftMs = info.FirstResponseTime.Sub(info.StartTime).Milliseconds() } latencyMs := now.Sub(info.StartTime).Milliseconds() generationMs := latencyMs if hasTtft { generationMs = now.Sub(info.FirstResponseTime).Milliseconds() } if generationMs <= 0 { generationMs = latencyMs } Record(Sample{ Model: info.OriginModelName, Group: info.UsingGroup, LatencyMs: latencyMs, TtftMs: ttftMs, HasTtft: hasTtft, Success: success, OutputTokens: outputTokens, GenerationMs: generationMs, }) } func Record(sample Sample) { setting := perf_metrics_setting.GetSetting() if !setting.Enabled || sample.Model == "" { return } if sample.Group == "" { sample.Group = "default" } if sample.LatencyMs < 0 { sample.LatencyMs = 0 } key := bucketKey{ model: sample.Model, group: sample.Group, bucketTs: bucketStart(time.Now().Unix()), } actual, _ := hotBuckets.LoadOrStore(key, &atomicBucket{}) actual.(*atomicBucket).add(sample) recordRedis(key, sample) } func Query(params QueryParams) (QueryResult, error) { if params.Hours <= 0 { params.Hours = 24 } if params.Hours > 24*30 { params.Hours = 24 * 30 } endTs := time.Now().Unix() startTs := endTs - int64(params.Hours)*3600 merged := map[bucketKey]counters{} rows, err := model.GetPerfMetrics(params.Model, params.Group, startTs, endTs) if err != nil { return QueryResult{}, err } for _, row := range rows { mergeCounters(merged, bucketKey{ model: row.ModelName, group: row.Group, bucketTs: row.BucketTs, }, counters{ requestCount: row.RequestCount, successCount: row.SuccessCount, totalLatencyMs: row.TotalLatencyMs, ttftSumMs: row.TtftSumMs, ttftCount: row.TtftCount, outputTokens: row.OutputTokens, generationMs: row.GenerationMs, }) } hotBuckets.Range(func(key, value any) bool { k := key.(bucketKey) if k.model != params.Model || k.bucketTs < startTs || k.bucketTs > endTs { return true } if params.Group != "" && k.group != params.Group { return true } mergeCounters(merged, k, value.(*atomicBucket).snapshot()) return true }) return buildQueryResult(params.Model, merged), nil } func QuerySummaryAll(hours int) (SummaryAllResult, error) { if hours <= 0 { hours = 24 } if hours > 24*30 { hours = 24 * 30 } endTs := time.Now().Unix() startTs := endTs - int64(hours)*3600 rows, err := model.GetPerfMetricsSummaryAll(startTs, endTs) if err != nil { return SummaryAllResult{}, err } totals := map[string]counters{} for _, row := range rows { totals[row.ModelName] = counters{ requestCount: row.RequestCount, successCount: row.SuccessCount, totalLatencyMs: row.TotalLatencyMs, outputTokens: row.OutputTokens, generationMs: row.GenerationMs, } } hotBuckets.Range(func(key, value any) bool { k := key.(bucketKey) if k.bucketTs < startTs || k.bucketTs > endTs { return true } snap := value.(*atomicBucket).snapshot() if snap.requestCount == 0 { return true } cur := totals[k.model] cur.requestCount += snap.requestCount cur.successCount += snap.successCount cur.totalLatencyMs += snap.totalLatencyMs cur.outputTokens += snap.outputTokens cur.generationMs += snap.generationMs totals[k.model] = cur return true }) models := make([]ModelSummary, 0, len(totals)) for name, total := range totals { if total.requestCount == 0 { continue } avgLatency := total.totalLatencyMs / total.requestCount successRate := float64(total.successCount) / float64(total.requestCount) * 100 avgTps := 0.0 if total.generationMs > 0 { avgTps = float64(total.outputTokens) / (float64(total.generationMs) / 1000.0) } models = append(models, ModelSummary{ ModelName: name, AvgLatencyMs: avgLatency, SuccessRate: math.Round(successRate*100) / 100, AvgTps: math.Round(avgTps*100) / 100, RequestCount: total.requestCount, }) } sort.Slice(models, func(i, j int) bool { return models[i].ModelName < models[j].ModelName }) return SummaryAllResult{Models: models}, nil } func bucketStart(ts int64) int64 { bucketSeconds := perf_metrics_setting.GetBucketSeconds() if bucketSeconds <= 0 { bucketSeconds = 3600 } return ts - (ts % bucketSeconds) } func mergeCounters(merged map[bucketKey]counters, key bucketKey, value counters) { if value.requestCount == 0 { return } current := merged[key] current.requestCount += value.requestCount current.successCount += value.successCount current.totalLatencyMs += value.totalLatencyMs current.ttftSumMs += value.ttftSumMs current.ttftCount += value.ttftCount current.outputTokens += value.outputTokens current.generationMs += value.generationMs merged[key] = current } func buildQueryResult(modelName string, merged map[bucketKey]counters) QueryResult { groupBuckets := map[string]map[int64]counters{} for key, value := range merged { if value.requestCount == 0 { continue } if _, ok := groupBuckets[key.group]; !ok { groupBuckets[key.group] = map[int64]counters{} } groupBuckets[key.group][key.bucketTs] = value } groups := make([]string, 0, len(groupBuckets)) for group := range groupBuckets { groups = append(groups, group) } sort.Strings(groups) results := make([]GroupResult, 0, len(groups)) for _, group := range groups { buckets := groupBuckets[group] timestamps := make([]int64, 0, len(buckets)) for ts := range buckets { timestamps = append(timestamps, ts) } sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) total := counters{} series := make([]BucketPoint, 0, len(timestamps)) for _, ts := range timestamps { value := buckets[ts] total.requestCount += value.requestCount total.successCount += value.successCount total.totalLatencyMs += value.totalLatencyMs total.ttftSumMs += value.ttftSumMs total.ttftCount += value.ttftCount total.outputTokens += value.outputTokens total.generationMs += value.generationMs series = append(series, bucketPoint(ts, value)) } results = append(results, GroupResult{ Group: group, AvgTtftMs: avg(total.ttftSumMs, total.ttftCount), AvgLatencyMs: avg(total.totalLatencyMs, total.requestCount), SuccessRate: successRate(total), AvgTps: avgTps(total), Series: series, }) } return QueryResult{ ModelName: modelName, SeriesSchema: seriesSchema, Groups: results, } } func bucketPoint(ts int64, value counters) BucketPoint { return BucketPoint{ Ts: ts, AvgTtftMs: avg(value.ttftSumMs, value.ttftCount), AvgLatencyMs: avg(value.totalLatencyMs, value.requestCount), SuccessRate: successRate(value), AvgTps: avgTps(value), } } func avg(sum int64, count int64) int64 { if count <= 0 { return 0 } return sum / count } func successRate(value counters) float64 { if value.requestCount <= 0 { return 0 } return float64(value.successCount) / float64(value.requestCount) * 100 } func avgTps(value counters) float64 { if value.outputTokens <= 0 || value.generationMs <= 0 { return 0 } return float64(value.outputTokens) / (float64(value.generationMs) / 1000) } func recordRedis(key bucketKey, sample Sample) { if !common.RedisEnabled || common.RDB == nil { return } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() redisKey := redisBucketKey(key) pipe := common.RDB.TxPipeline() pipe.HIncrBy(ctx, redisKey, "req", 1) if sample.Success { pipe.HIncrBy(ctx, redisKey, "ok", 1) } if sample.LatencyMs > 0 { pipe.HIncrBy(ctx, redisKey, "lat", sample.LatencyMs) } if sample.HasTtft && sample.TtftMs >= 0 { pipe.HIncrBy(ctx, redisKey, "ttft", sample.TtftMs) pipe.HIncrBy(ctx, redisKey, "ttft_n", 1) } if sample.OutputTokens > 0 && sample.GenerationMs > 0 { pipe.HIncrBy(ctx, redisKey, "out", sample.OutputTokens) pipe.HIncrBy(ctx, redisKey, "gen_ms", sample.GenerationMs) } pipe.Expire(ctx, redisKey, time.Hour) _, _ = pipe.Exec(ctx) } func mergeRedisActiveBuckets(merged map[bucketKey]counters, params QueryParams, startTs int64, endTs int64) { if !common.RedisEnabled || common.RDB == nil || params.Model == "" || params.Group == "" { return } active := bucketStart(time.Now().Unix()) if active < startTs || active > endTs { return } key := bucketKey{model: params.Model, group: params.Group, bucketTs: active} ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() values, err := common.RDB.HGetAll(ctx, redisBucketKey(key)).Result() if err != nil || len(values) == 0 { return } mergeCounters(merged, key, redisCounters(values)) } func redisBucketKey(key bucketKey) string { return fmt.Sprintf("perf:%s:%s:%d", key.model, key.group, key.bucketTs) }