utils.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package model
  2. import (
  3. "errors"
  4. "one-api/common"
  5. "one-api/logger"
  6. "sync"
  7. "time"
  8. "github.com/bytedance/gopkg/util/gopool"
  9. "gorm.io/gorm"
  10. )
  11. const (
  12. BatchUpdateTypeUserQuota = iota
  13. BatchUpdateTypeTokenQuota
  14. BatchUpdateTypeUsedQuota
  15. BatchUpdateTypeChannelUsedQuota
  16. BatchUpdateTypeRequestCount
  17. BatchUpdateTypeCount // if you add a new type, you need to add a new map and a new lock
  18. )
  19. var batchUpdateStores []map[int]int
  20. var batchUpdateLocks []sync.Mutex
  21. func init() {
  22. for i := 0; i < BatchUpdateTypeCount; i++ {
  23. batchUpdateStores = append(batchUpdateStores, make(map[int]int))
  24. batchUpdateLocks = append(batchUpdateLocks, sync.Mutex{})
  25. }
  26. }
  27. func InitBatchUpdater() {
  28. gopool.Go(func() {
  29. for {
  30. time.Sleep(time.Duration(common.BatchUpdateInterval) * time.Second)
  31. batchUpdate()
  32. }
  33. })
  34. }
  35. func addNewRecord(type_ int, id int, value int) {
  36. batchUpdateLocks[type_].Lock()
  37. defer batchUpdateLocks[type_].Unlock()
  38. if _, ok := batchUpdateStores[type_][id]; !ok {
  39. batchUpdateStores[type_][id] = value
  40. } else {
  41. batchUpdateStores[type_][id] += value
  42. }
  43. }
  44. func batchUpdate() {
  45. // check if there's any data to update
  46. hasData := false
  47. for i := 0; i < BatchUpdateTypeCount; i++ {
  48. batchUpdateLocks[i].Lock()
  49. if len(batchUpdateStores[i]) > 0 {
  50. hasData = true
  51. batchUpdateLocks[i].Unlock()
  52. break
  53. }
  54. batchUpdateLocks[i].Unlock()
  55. }
  56. if !hasData {
  57. return
  58. }
  59. logger.SysLog("batch update started")
  60. for i := 0; i < BatchUpdateTypeCount; i++ {
  61. batchUpdateLocks[i].Lock()
  62. store := batchUpdateStores[i]
  63. batchUpdateStores[i] = make(map[int]int)
  64. batchUpdateLocks[i].Unlock()
  65. // TODO: maybe we can combine updates with same key?
  66. for key, value := range store {
  67. switch i {
  68. case BatchUpdateTypeUserQuota:
  69. err := increaseUserQuota(key, value)
  70. if err != nil {
  71. logger.SysError("failed to batch update user quota: " + err.Error())
  72. }
  73. case BatchUpdateTypeTokenQuota:
  74. err := increaseTokenQuota(key, value)
  75. if err != nil {
  76. logger.SysError("failed to batch update token quota: " + err.Error())
  77. }
  78. case BatchUpdateTypeUsedQuota:
  79. updateUserUsedQuota(key, value)
  80. case BatchUpdateTypeRequestCount:
  81. updateUserRequestCount(key, value)
  82. case BatchUpdateTypeChannelUsedQuota:
  83. updateChannelUsedQuota(key, value)
  84. }
  85. }
  86. }
  87. logger.SysLog("batch update finished")
  88. }
  89. func RecordExist(err error) (bool, error) {
  90. if err == nil {
  91. return true, nil
  92. }
  93. if errors.Is(err, gorm.ErrRecordNotFound) {
  94. return false, nil
  95. }
  96. return false, err
  97. }
  98. func shouldUpdateRedis(fromDB bool, err error) bool {
  99. return common.RedisEnabled && fromDB && err == nil
  100. }