metrics.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package perfmetrics
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/QuantumNous/new-api/common"
  10. "github.com/QuantumNous/new-api/model"
  11. relaycommon "github.com/QuantumNous/new-api/relay/common"
  12. "github.com/QuantumNous/new-api/setting/perf_metrics_setting"
  13. )
  14. var hotBuckets sync.Map
  15. // seriesSchema is a stable client cache/schema marker. Do not change it when
  16. // hiding fields or making response-only privacy hardening changes.
  17. const seriesSchema = "dbcd0a3c01b55203"
  18. func Init() {
  19. go flushLoop()
  20. }
  21. func RecordRelaySample(info *relaycommon.RelayInfo, success bool, outputTokens int64) {
  22. if info == nil {
  23. return
  24. }
  25. now := time.Now()
  26. hasTtft := info.IsStream && info.HasSendResponse()
  27. ttftMs := int64(0)
  28. if hasTtft {
  29. ttftMs = info.FirstResponseTime.Sub(info.StartTime).Milliseconds()
  30. }
  31. latencyMs := now.Sub(info.StartTime).Milliseconds()
  32. generationMs := latencyMs
  33. if hasTtft {
  34. generationMs = now.Sub(info.FirstResponseTime).Milliseconds()
  35. }
  36. if generationMs <= 0 {
  37. generationMs = latencyMs
  38. }
  39. Record(Sample{
  40. Model: info.OriginModelName,
  41. Group: info.UsingGroup,
  42. LatencyMs: latencyMs,
  43. TtftMs: ttftMs,
  44. HasTtft: hasTtft,
  45. Success: success,
  46. OutputTokens: outputTokens,
  47. GenerationMs: generationMs,
  48. })
  49. }
  50. func Record(sample Sample) {
  51. setting := perf_metrics_setting.GetSetting()
  52. if !setting.Enabled || sample.Model == "" {
  53. return
  54. }
  55. if sample.Group == "" {
  56. sample.Group = "default"
  57. }
  58. if sample.LatencyMs < 0 {
  59. sample.LatencyMs = 0
  60. }
  61. key := bucketKey{
  62. model: sample.Model,
  63. group: sample.Group,
  64. bucketTs: bucketStart(time.Now().Unix()),
  65. }
  66. actual, _ := hotBuckets.LoadOrStore(key, &atomicBucket{})
  67. actual.(*atomicBucket).add(sample)
  68. recordRedis(key, sample)
  69. }
  70. func Query(params QueryParams) (QueryResult, error) {
  71. if params.Hours <= 0 {
  72. params.Hours = 24
  73. }
  74. if params.Hours > 24*30 {
  75. params.Hours = 24 * 30
  76. }
  77. endTs := time.Now().Unix()
  78. startTs := endTs - int64(params.Hours)*3600
  79. merged := map[bucketKey]counters{}
  80. rows, err := model.GetPerfMetrics(params.Model, params.Group, startTs, endTs)
  81. if err != nil {
  82. return QueryResult{}, err
  83. }
  84. for _, row := range rows {
  85. mergeCounters(merged, bucketKey{
  86. model: row.ModelName,
  87. group: row.Group,
  88. bucketTs: row.BucketTs,
  89. }, counters{
  90. requestCount: row.RequestCount,
  91. successCount: row.SuccessCount,
  92. totalLatencyMs: row.TotalLatencyMs,
  93. ttftSumMs: row.TtftSumMs,
  94. ttftCount: row.TtftCount,
  95. outputTokens: row.OutputTokens,
  96. generationMs: row.GenerationMs,
  97. })
  98. }
  99. hotBuckets.Range(func(key, value any) bool {
  100. k := key.(bucketKey)
  101. if k.model != params.Model || k.bucketTs < startTs || k.bucketTs > endTs {
  102. return true
  103. }
  104. if params.Group != "" && k.group != params.Group {
  105. return true
  106. }
  107. mergeCounters(merged, k, value.(*atomicBucket).snapshot())
  108. return true
  109. })
  110. return buildQueryResult(params.Model, merged), nil
  111. }
  112. func QuerySummaryAll(hours int) (SummaryAllResult, error) {
  113. if hours <= 0 {
  114. hours = 24
  115. }
  116. if hours > 24*30 {
  117. hours = 24 * 30
  118. }
  119. endTs := time.Now().Unix()
  120. startTs := endTs - int64(hours)*3600
  121. rows, err := model.GetPerfMetricsSummaryAll(startTs, endTs)
  122. if err != nil {
  123. return SummaryAllResult{}, err
  124. }
  125. totals := map[string]counters{}
  126. for _, row := range rows {
  127. totals[row.ModelName] = counters{
  128. requestCount: row.RequestCount,
  129. successCount: row.SuccessCount,
  130. totalLatencyMs: row.TotalLatencyMs,
  131. outputTokens: row.OutputTokens,
  132. generationMs: row.GenerationMs,
  133. }
  134. }
  135. hotBuckets.Range(func(key, value any) bool {
  136. k := key.(bucketKey)
  137. if k.bucketTs < startTs || k.bucketTs > endTs {
  138. return true
  139. }
  140. snap := value.(*atomicBucket).snapshot()
  141. if snap.requestCount == 0 {
  142. return true
  143. }
  144. cur := totals[k.model]
  145. cur.requestCount += snap.requestCount
  146. cur.successCount += snap.successCount
  147. cur.totalLatencyMs += snap.totalLatencyMs
  148. cur.outputTokens += snap.outputTokens
  149. cur.generationMs += snap.generationMs
  150. totals[k.model] = cur
  151. return true
  152. })
  153. models := make([]ModelSummary, 0, len(totals))
  154. for name, total := range totals {
  155. if total.requestCount == 0 {
  156. continue
  157. }
  158. avgLatency := total.totalLatencyMs / total.requestCount
  159. successRate := float64(total.successCount) / float64(total.requestCount) * 100
  160. avgTps := 0.0
  161. if total.generationMs > 0 {
  162. avgTps = float64(total.outputTokens) / (float64(total.generationMs) / 1000.0)
  163. }
  164. models = append(models, ModelSummary{
  165. ModelName: name,
  166. AvgLatencyMs: avgLatency,
  167. SuccessRate: math.Round(successRate*100) / 100,
  168. AvgTps: math.Round(avgTps*100) / 100,
  169. RequestCount: total.requestCount,
  170. })
  171. }
  172. sort.Slice(models, func(i, j int) bool {
  173. return models[i].ModelName < models[j].ModelName
  174. })
  175. return SummaryAllResult{Models: models}, nil
  176. }
  177. func bucketStart(ts int64) int64 {
  178. bucketSeconds := perf_metrics_setting.GetBucketSeconds()
  179. if bucketSeconds <= 0 {
  180. bucketSeconds = 3600
  181. }
  182. return ts - (ts % bucketSeconds)
  183. }
  184. func mergeCounters(merged map[bucketKey]counters, key bucketKey, value counters) {
  185. if value.requestCount == 0 {
  186. return
  187. }
  188. current := merged[key]
  189. current.requestCount += value.requestCount
  190. current.successCount += value.successCount
  191. current.totalLatencyMs += value.totalLatencyMs
  192. current.ttftSumMs += value.ttftSumMs
  193. current.ttftCount += value.ttftCount
  194. current.outputTokens += value.outputTokens
  195. current.generationMs += value.generationMs
  196. merged[key] = current
  197. }
  198. func buildQueryResult(modelName string, merged map[bucketKey]counters) QueryResult {
  199. groupBuckets := map[string]map[int64]counters{}
  200. for key, value := range merged {
  201. if value.requestCount == 0 {
  202. continue
  203. }
  204. if _, ok := groupBuckets[key.group]; !ok {
  205. groupBuckets[key.group] = map[int64]counters{}
  206. }
  207. groupBuckets[key.group][key.bucketTs] = value
  208. }
  209. groups := make([]string, 0, len(groupBuckets))
  210. for group := range groupBuckets {
  211. groups = append(groups, group)
  212. }
  213. sort.Strings(groups)
  214. results := make([]GroupResult, 0, len(groups))
  215. for _, group := range groups {
  216. buckets := groupBuckets[group]
  217. timestamps := make([]int64, 0, len(buckets))
  218. for ts := range buckets {
  219. timestamps = append(timestamps, ts)
  220. }
  221. sort.Slice(timestamps, func(i, j int) bool {
  222. return timestamps[i] < timestamps[j]
  223. })
  224. total := counters{}
  225. series := make([]BucketPoint, 0, len(timestamps))
  226. for _, ts := range timestamps {
  227. value := buckets[ts]
  228. total.requestCount += value.requestCount
  229. total.successCount += value.successCount
  230. total.totalLatencyMs += value.totalLatencyMs
  231. total.ttftSumMs += value.ttftSumMs
  232. total.ttftCount += value.ttftCount
  233. total.outputTokens += value.outputTokens
  234. total.generationMs += value.generationMs
  235. series = append(series, bucketPoint(ts, value))
  236. }
  237. results = append(results, GroupResult{
  238. Group: group,
  239. AvgTtftMs: avg(total.ttftSumMs, total.ttftCount),
  240. AvgLatencyMs: avg(total.totalLatencyMs, total.requestCount),
  241. SuccessRate: successRate(total),
  242. AvgTps: avgTps(total),
  243. Series: series,
  244. })
  245. }
  246. return QueryResult{
  247. ModelName: modelName,
  248. SeriesSchema: seriesSchema,
  249. Groups: results,
  250. }
  251. }
  252. func bucketPoint(ts int64, value counters) BucketPoint {
  253. return BucketPoint{
  254. Ts: ts,
  255. AvgTtftMs: avg(value.ttftSumMs, value.ttftCount),
  256. AvgLatencyMs: avg(value.totalLatencyMs, value.requestCount),
  257. SuccessRate: successRate(value),
  258. AvgTps: avgTps(value),
  259. }
  260. }
  261. func avg(sum int64, count int64) int64 {
  262. if count <= 0 {
  263. return 0
  264. }
  265. return sum / count
  266. }
  267. func successRate(value counters) float64 {
  268. if value.requestCount <= 0 {
  269. return 0
  270. }
  271. return float64(value.successCount) / float64(value.requestCount) * 100
  272. }
  273. func avgTps(value counters) float64 {
  274. if value.outputTokens <= 0 || value.generationMs <= 0 {
  275. return 0
  276. }
  277. return float64(value.outputTokens) / (float64(value.generationMs) / 1000)
  278. }
  279. func recordRedis(key bucketKey, sample Sample) {
  280. if !common.RedisEnabled || common.RDB == nil {
  281. return
  282. }
  283. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  284. defer cancel()
  285. redisKey := redisBucketKey(key)
  286. pipe := common.RDB.TxPipeline()
  287. pipe.HIncrBy(ctx, redisKey, "req", 1)
  288. if sample.Success {
  289. pipe.HIncrBy(ctx, redisKey, "ok", 1)
  290. }
  291. if sample.LatencyMs > 0 {
  292. pipe.HIncrBy(ctx, redisKey, "lat", sample.LatencyMs)
  293. }
  294. if sample.HasTtft && sample.TtftMs >= 0 {
  295. pipe.HIncrBy(ctx, redisKey, "ttft", sample.TtftMs)
  296. pipe.HIncrBy(ctx, redisKey, "ttft_n", 1)
  297. }
  298. if sample.OutputTokens > 0 && sample.GenerationMs > 0 {
  299. pipe.HIncrBy(ctx, redisKey, "out", sample.OutputTokens)
  300. pipe.HIncrBy(ctx, redisKey, "gen_ms", sample.GenerationMs)
  301. }
  302. pipe.Expire(ctx, redisKey, time.Hour)
  303. _, _ = pipe.Exec(ctx)
  304. }
  305. func mergeRedisActiveBuckets(merged map[bucketKey]counters, params QueryParams, startTs int64, endTs int64) {
  306. if !common.RedisEnabled || common.RDB == nil || params.Model == "" || params.Group == "" {
  307. return
  308. }
  309. active := bucketStart(time.Now().Unix())
  310. if active < startTs || active > endTs {
  311. return
  312. }
  313. key := bucketKey{model: params.Model, group: params.Group, bucketTs: active}
  314. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  315. defer cancel()
  316. values, err := common.RDB.HGetAll(ctx, redisBucketKey(key)).Result()
  317. if err != nil || len(values) == 0 {
  318. return
  319. }
  320. mergeCounters(merged, key, redisCounters(values))
  321. }
  322. func redisBucketKey(key bucketKey) string {
  323. return fmt.Sprintf("perf:%s:%s:%d", key.model, key.group, key.bucketTs)
  324. }