flush.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package perfmetrics
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/QuantumNous/new-api/common"
  7. "github.com/QuantumNous/new-api/model"
  8. "github.com/QuantumNous/new-api/setting/perf_metrics_setting"
  9. )
  10. func flushLoop() {
  11. for {
  12. interval := perf_metrics_setting.GetFlushIntervalMinutes()
  13. time.Sleep(time.Duration(interval) * time.Minute)
  14. setting := perf_metrics_setting.GetSetting()
  15. if !setting.Enabled {
  16. continue
  17. }
  18. flushCompletedBuckets()
  19. cleanupExpiredMetrics(setting.RetentionDays)
  20. }
  21. }
  22. func flushCompletedBuckets() {
  23. currentBucket := bucketStart(time.Now().Unix())
  24. hotBuckets.Range(func(key, value any) bool {
  25. k := key.(bucketKey)
  26. if k.bucketTs >= currentBucket {
  27. return true
  28. }
  29. bucket := value.(*atomicBucket)
  30. drained := bucket.drain()
  31. if drained.requestCount == 0 {
  32. deleteOldEmptyBucket(k, key)
  33. return true
  34. }
  35. err := model.UpsertPerfMetric(&model.PerfMetric{
  36. ModelName: k.model,
  37. Group: k.group,
  38. BucketTs: k.bucketTs,
  39. RequestCount: drained.requestCount,
  40. SuccessCount: drained.successCount,
  41. TotalLatencyMs: drained.totalLatencyMs,
  42. TtftSumMs: drained.ttftSumMs,
  43. TtftCount: drained.ttftCount,
  44. OutputTokens: drained.outputTokens,
  45. GenerationMs: drained.generationMs,
  46. })
  47. if err != nil {
  48. bucket.addCounters(drained)
  49. common.SysError(fmt.Sprintf("failed to flush perf metric bucket model=%s group=%s bucket=%d: %s", k.model, k.group, k.bucketTs, err.Error()))
  50. return true
  51. }
  52. deleteOldEmptyBucket(k, key)
  53. return true
  54. })
  55. }
  56. func deleteOldEmptyBucket(k bucketKey, rawKey any) {
  57. if k.bucketTs < bucketStart(time.Now().Add(-24*time.Hour).Unix()) {
  58. hotBuckets.Delete(rawKey)
  59. }
  60. }
  61. func cleanupExpiredMetrics(retentionDays int) {
  62. if retentionDays <= 0 {
  63. return
  64. }
  65. cutoff := time.Now().Add(-time.Duration(retentionDays) * 24 * time.Hour).Unix()
  66. if err := model.DeletePerfMetricsBefore(cutoff); err != nil {
  67. common.SysError("failed to cleanup expired perf metrics: " + err.Error())
  68. }
  69. }
  70. func redisCounters(values map[string]string) counters {
  71. return counters{
  72. requestCount: parseRedisInt(values["req"]),
  73. successCount: parseRedisInt(values["ok"]),
  74. totalLatencyMs: parseRedisInt(values["lat"]),
  75. ttftSumMs: parseRedisInt(values["ttft"]),
  76. ttftCount: parseRedisInt(values["ttft_n"]),
  77. outputTokens: parseRedisInt(values["out"]),
  78. generationMs: parseRedisInt(values["gen_ms"]),
  79. }
  80. }
  81. func parseRedisInt(value string) int64 {
  82. if value == "" {
  83. return 0
  84. }
  85. parsed, _ := strconv.ParseInt(value, 10, 64)
  86. return parsed
  87. }