flush.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package perfmetrics
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/QuantumNous/new-api/common"
  7. "github.com/QuantumNous/new-api/model"
  8. "github.com/QuantumNous/new-api/setting/perf_metrics_setting"
  9. )
  10. func flushLoop() {
  11. for {
  12. interval := perf_metrics_setting.GetFlushIntervalMinutes()
  13. time.Sleep(time.Duration(interval) * time.Minute)
  14. setting := perf_metrics_setting.GetSetting()
  15. if !setting.Enabled {
  16. continue
  17. }
  18. flushCompletedBuckets()
  19. cleanupExpiredMetrics(setting.RetentionDays)
  20. }
  21. }
  22. func flushCompletedBuckets() {
  23. currentBucket := bucketStart(time.Now().Unix())
  24. hotBuckets.Range(func(key, value any) bool {
  25. k := key.(bucketKey)
  26. if k.bucketTs >= currentBucket {
  27. return true
  28. }
  29. bucket := value.(*atomicBucket)
  30. drained := bucket.drain()
  31. if drained.requestCount == 0 {
  32. deleteOldEmptyBucket(k, key)
  33. return true
  34. }
  35. err := model.UpsertPerfMetric(&model.PerfMetric{
  36. ModelName: k.model,
  37. Group: k.group,
  38. BucketTs: k.bucketTs,
  39. RequestCount: drained.requestCount,
  40. SuccessCount: drained.successCount,
  41. TotalLatencyMs: drained.totalLatencyMs,
  42. TtftSumMs: drained.ttftSumMs,
  43. TtftCount: drained.ttftCount,
  44. })
  45. if err != nil {
  46. bucket.addCounters(drained)
  47. common.SysError(fmt.Sprintf("failed to flush perf metric bucket model=%s group=%s bucket=%d: %s", k.model, k.group, k.bucketTs, err.Error()))
  48. return true
  49. }
  50. deleteOldEmptyBucket(k, key)
  51. return true
  52. })
  53. }
  54. func deleteOldEmptyBucket(k bucketKey, rawKey any) {
  55. if k.bucketTs < bucketStart(time.Now().Add(-24*time.Hour).Unix()) {
  56. hotBuckets.Delete(rawKey)
  57. }
  58. }
  59. func cleanupExpiredMetrics(retentionDays int) {
  60. if retentionDays <= 0 {
  61. return
  62. }
  63. cutoff := time.Now().Add(-time.Duration(retentionDays) * 24 * time.Hour).Unix()
  64. if err := model.DeletePerfMetricsBefore(cutoff); err != nil {
  65. common.SysError("failed to cleanup expired perf metrics: " + err.Error())
  66. }
  67. }
  68. func redisCounters(values map[string]string) counters {
  69. return counters{
  70. requestCount: parseRedisInt(values["req"]),
  71. successCount: parseRedisInt(values["ok"]),
  72. totalLatencyMs: parseRedisInt(values["lat"]),
  73. ttftSumMs: parseRedisInt(values["ttft"]),
  74. ttftCount: parseRedisInt(values["ttft_n"]),
  75. }
  76. }
  77. func parseRedisInt(value string) int64 {
  78. if value == "" {
  79. return 0
  80. }
  81. parsed, _ := strconv.ParseInt(value, 10, 64)
  82. return parsed
  83. }