channel_affinity.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. package service
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/QuantumNous/new-api/common"
  11. "github.com/QuantumNous/new-api/dto"
  12. "github.com/QuantumNous/new-api/pkg/cachex"
  13. "github.com/QuantumNous/new-api/setting/operation_setting"
  14. "github.com/gin-gonic/gin"
  15. "github.com/samber/hot"
  16. "github.com/tidwall/gjson"
  17. )
  18. const (
  19. ginKeyChannelAffinityCacheKey = "channel_affinity_cache_key"
  20. ginKeyChannelAffinityTTLSeconds = "channel_affinity_ttl_seconds"
  21. ginKeyChannelAffinityMeta = "channel_affinity_meta"
  22. ginKeyChannelAffinityLogInfo = "channel_affinity_log_info"
  23. ginKeyChannelAffinitySkipRetry = "channel_affinity_skip_retry_on_failure"
  24. channelAffinityCacheNamespace = "new-api:channel_affinity:v1"
  25. channelAffinityUsageCacheStatsNamespace = "new-api:channel_affinity_usage_cache_stats:v1"
  26. )
  27. var (
  28. channelAffinityCacheOnce sync.Once
  29. channelAffinityCache *cachex.HybridCache[int]
  30. channelAffinityUsageCacheStatsOnce sync.Once
  31. channelAffinityUsageCacheStatsCache *cachex.HybridCache[ChannelAffinityUsageCacheCounters]
  32. channelAffinityRegexCache sync.Map // map[string]*regexp.Regexp
  33. )
  34. type channelAffinityMeta struct {
  35. CacheKey string
  36. TTLSeconds int
  37. RuleName string
  38. SkipRetry bool
  39. KeySourceType string
  40. KeySourceKey string
  41. KeySourcePath string
  42. KeyHint string
  43. KeyFingerprint string
  44. UsingGroup string
  45. ModelName string
  46. RequestPath string
  47. }
  48. type ChannelAffinityStatsContext struct {
  49. RuleName string
  50. UsingGroup string
  51. KeyFingerprint string
  52. TTLSeconds int64
  53. }
  54. type ChannelAffinityCacheStats struct {
  55. Enabled bool `json:"enabled"`
  56. Total int `json:"total"`
  57. Unknown int `json:"unknown"`
  58. ByRuleName map[string]int `json:"by_rule_name"`
  59. CacheCapacity int `json:"cache_capacity"`
  60. CacheAlgo string `json:"cache_algo"`
  61. }
  62. func getChannelAffinityCache() *cachex.HybridCache[int] {
  63. channelAffinityCacheOnce.Do(func() {
  64. setting := operation_setting.GetChannelAffinitySetting()
  65. capacity := setting.MaxEntries
  66. if capacity <= 0 {
  67. capacity = 100_000
  68. }
  69. defaultTTLSeconds := setting.DefaultTTLSeconds
  70. if defaultTTLSeconds <= 0 {
  71. defaultTTLSeconds = 3600
  72. }
  73. channelAffinityCache = cachex.NewHybridCache[int](cachex.HybridCacheConfig[int]{
  74. Namespace: cachex.Namespace(channelAffinityCacheNamespace),
  75. Redis: common.RDB,
  76. RedisEnabled: func() bool {
  77. return common.RedisEnabled && common.RDB != nil
  78. },
  79. RedisCodec: cachex.IntCodec{},
  80. Memory: func() *hot.HotCache[string, int] {
  81. return hot.NewHotCache[string, int](hot.LRU, capacity).
  82. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  83. WithJanitor().
  84. Build()
  85. },
  86. })
  87. })
  88. return channelAffinityCache
  89. }
  90. func GetChannelAffinityCacheStats() ChannelAffinityCacheStats {
  91. setting := operation_setting.GetChannelAffinitySetting()
  92. if setting == nil {
  93. return ChannelAffinityCacheStats{
  94. Enabled: false,
  95. Total: 0,
  96. Unknown: 0,
  97. ByRuleName: map[string]int{},
  98. }
  99. }
  100. cache := getChannelAffinityCache()
  101. mainCap, _ := cache.Capacity()
  102. mainAlgo, _ := cache.Algorithm()
  103. rules := setting.Rules
  104. ruleByName := make(map[string]operation_setting.ChannelAffinityRule, len(rules))
  105. for _, r := range rules {
  106. name := strings.TrimSpace(r.Name)
  107. if name == "" {
  108. continue
  109. }
  110. if !r.IncludeRuleName {
  111. continue
  112. }
  113. ruleByName[name] = r
  114. }
  115. byRuleName := make(map[string]int, len(ruleByName))
  116. for name := range ruleByName {
  117. byRuleName[name] = 0
  118. }
  119. keys, err := cache.Keys()
  120. if err != nil {
  121. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  122. keys = nil
  123. }
  124. total := len(keys)
  125. unknown := 0
  126. for _, k := range keys {
  127. prefix := channelAffinityCacheNamespace + ":"
  128. if !strings.HasPrefix(k, prefix) {
  129. unknown++
  130. continue
  131. }
  132. rest := strings.TrimPrefix(k, prefix)
  133. parts := strings.Split(rest, ":")
  134. if len(parts) < 2 {
  135. unknown++
  136. continue
  137. }
  138. ruleName := parts[0]
  139. rule, ok := ruleByName[ruleName]
  140. if !ok {
  141. unknown++
  142. continue
  143. }
  144. if rule.IncludeUsingGroup {
  145. if len(parts) < 3 {
  146. unknown++
  147. continue
  148. }
  149. }
  150. byRuleName[ruleName]++
  151. }
  152. return ChannelAffinityCacheStats{
  153. Enabled: setting.Enabled,
  154. Total: total,
  155. Unknown: unknown,
  156. ByRuleName: byRuleName,
  157. CacheCapacity: mainCap,
  158. CacheAlgo: mainAlgo,
  159. }
  160. }
  161. func ClearChannelAffinityCacheAll() int {
  162. cache := getChannelAffinityCache()
  163. keys, err := cache.Keys()
  164. if err != nil {
  165. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  166. keys = nil
  167. }
  168. if len(keys) > 0 {
  169. if _, err := cache.DeleteMany(keys); err != nil {
  170. common.SysError(fmt.Sprintf("channel affinity cache delete many failed: err=%v", err))
  171. }
  172. }
  173. return len(keys)
  174. }
  175. func ClearChannelAffinityCacheByRuleName(ruleName string) (int, error) {
  176. ruleName = strings.TrimSpace(ruleName)
  177. if ruleName == "" {
  178. return 0, fmt.Errorf("rule_name 不能为空")
  179. }
  180. setting := operation_setting.GetChannelAffinitySetting()
  181. if setting == nil {
  182. return 0, fmt.Errorf("channel_affinity_setting 未初始化")
  183. }
  184. var matchedRule *operation_setting.ChannelAffinityRule
  185. for i := range setting.Rules {
  186. r := &setting.Rules[i]
  187. if strings.TrimSpace(r.Name) != ruleName {
  188. continue
  189. }
  190. matchedRule = r
  191. break
  192. }
  193. if matchedRule == nil {
  194. return 0, fmt.Errorf("未知规则名称")
  195. }
  196. if !matchedRule.IncludeRuleName {
  197. return 0, fmt.Errorf("该规则未启用 include_rule_name,无法按规则清空缓存")
  198. }
  199. cache := getChannelAffinityCache()
  200. deleted, err := cache.DeleteByPrefix(ruleName)
  201. if err != nil {
  202. return 0, err
  203. }
  204. return deleted, nil
  205. }
  206. func matchAnyRegexCached(patterns []string, s string) bool {
  207. if len(patterns) == 0 || s == "" {
  208. return false
  209. }
  210. for _, pattern := range patterns {
  211. if pattern == "" {
  212. continue
  213. }
  214. re, ok := channelAffinityRegexCache.Load(pattern)
  215. if !ok {
  216. compiled, err := regexp.Compile(pattern)
  217. if err != nil {
  218. continue
  219. }
  220. re = compiled
  221. channelAffinityRegexCache.Store(pattern, re)
  222. }
  223. if re.(*regexp.Regexp).MatchString(s) {
  224. return true
  225. }
  226. }
  227. return false
  228. }
  229. func matchAnyIncludeFold(patterns []string, s string) bool {
  230. if len(patterns) == 0 || s == "" {
  231. return false
  232. }
  233. sLower := strings.ToLower(s)
  234. for _, p := range patterns {
  235. p = strings.TrimSpace(p)
  236. if p == "" {
  237. continue
  238. }
  239. if strings.Contains(sLower, strings.ToLower(p)) {
  240. return true
  241. }
  242. }
  243. return false
  244. }
  245. func extractChannelAffinityValue(c *gin.Context, src operation_setting.ChannelAffinityKeySource) string {
  246. switch src.Type {
  247. case "context_int":
  248. if src.Key == "" {
  249. return ""
  250. }
  251. v := c.GetInt(src.Key)
  252. if v <= 0 {
  253. return ""
  254. }
  255. return strconv.Itoa(v)
  256. case "context_string":
  257. if src.Key == "" {
  258. return ""
  259. }
  260. return strings.TrimSpace(c.GetString(src.Key))
  261. case "gjson":
  262. if src.Path == "" {
  263. return ""
  264. }
  265. body, err := common.GetRequestBody(c)
  266. if err != nil || len(body) == 0 {
  267. return ""
  268. }
  269. res := gjson.GetBytes(body, src.Path)
  270. if !res.Exists() {
  271. return ""
  272. }
  273. switch res.Type {
  274. case gjson.String, gjson.Number, gjson.True, gjson.False:
  275. return strings.TrimSpace(res.String())
  276. default:
  277. return strings.TrimSpace(res.Raw)
  278. }
  279. default:
  280. return ""
  281. }
  282. }
  283. func buildChannelAffinityCacheKeySuffix(rule operation_setting.ChannelAffinityRule, usingGroup string, affinityValue string) string {
  284. parts := make([]string, 0, 3)
  285. if rule.IncludeRuleName && rule.Name != "" {
  286. parts = append(parts, rule.Name)
  287. }
  288. if rule.IncludeUsingGroup && usingGroup != "" {
  289. parts = append(parts, usingGroup)
  290. }
  291. parts = append(parts, affinityValue)
  292. return strings.Join(parts, ":")
  293. }
  294. func setChannelAffinityContext(c *gin.Context, meta channelAffinityMeta) {
  295. c.Set(ginKeyChannelAffinityCacheKey, meta.CacheKey)
  296. c.Set(ginKeyChannelAffinityTTLSeconds, meta.TTLSeconds)
  297. c.Set(ginKeyChannelAffinityMeta, meta)
  298. }
  299. func getChannelAffinityContext(c *gin.Context) (string, int, bool) {
  300. keyAny, ok := c.Get(ginKeyChannelAffinityCacheKey)
  301. if !ok {
  302. return "", 0, false
  303. }
  304. key, ok := keyAny.(string)
  305. if !ok || key == "" {
  306. return "", 0, false
  307. }
  308. ttlAny, ok := c.Get(ginKeyChannelAffinityTTLSeconds)
  309. if !ok {
  310. return key, 0, true
  311. }
  312. ttlSeconds, _ := ttlAny.(int)
  313. return key, ttlSeconds, true
  314. }
  315. func getChannelAffinityMeta(c *gin.Context) (channelAffinityMeta, bool) {
  316. anyMeta, ok := c.Get(ginKeyChannelAffinityMeta)
  317. if !ok {
  318. return channelAffinityMeta{}, false
  319. }
  320. meta, ok := anyMeta.(channelAffinityMeta)
  321. if !ok {
  322. return channelAffinityMeta{}, false
  323. }
  324. return meta, true
  325. }
  326. func GetChannelAffinityStatsContext(c *gin.Context) (ChannelAffinityStatsContext, bool) {
  327. if c == nil {
  328. return ChannelAffinityStatsContext{}, false
  329. }
  330. meta, ok := getChannelAffinityMeta(c)
  331. if !ok {
  332. return ChannelAffinityStatsContext{}, false
  333. }
  334. ruleName := strings.TrimSpace(meta.RuleName)
  335. keyFp := strings.TrimSpace(meta.KeyFingerprint)
  336. usingGroup := strings.TrimSpace(meta.UsingGroup)
  337. if ruleName == "" || keyFp == "" {
  338. return ChannelAffinityStatsContext{}, false
  339. }
  340. ttlSeconds := int64(meta.TTLSeconds)
  341. if ttlSeconds <= 0 {
  342. return ChannelAffinityStatsContext{}, false
  343. }
  344. return ChannelAffinityStatsContext{
  345. RuleName: ruleName,
  346. UsingGroup: usingGroup,
  347. KeyFingerprint: keyFp,
  348. TTLSeconds: ttlSeconds,
  349. }, true
  350. }
  351. func affinityFingerprint(s string) string {
  352. if s == "" {
  353. return ""
  354. }
  355. hex := common.Sha1([]byte(s))
  356. if len(hex) >= 8 {
  357. return hex[:8]
  358. }
  359. return hex
  360. }
  361. func buildChannelAffinityKeyHint(s string) string {
  362. s = strings.TrimSpace(s)
  363. if s == "" {
  364. return ""
  365. }
  366. s = strings.ReplaceAll(s, "\n", " ")
  367. s = strings.ReplaceAll(s, "\r", " ")
  368. if len(s) <= 12 {
  369. return s
  370. }
  371. return s[:4] + "..." + s[len(s)-4:]
  372. }
  373. func GetPreferredChannelByAffinity(c *gin.Context, modelName string, usingGroup string) (int, bool) {
  374. setting := operation_setting.GetChannelAffinitySetting()
  375. if setting == nil || !setting.Enabled {
  376. return 0, false
  377. }
  378. path := ""
  379. if c != nil && c.Request != nil && c.Request.URL != nil {
  380. path = c.Request.URL.Path
  381. }
  382. userAgent := ""
  383. if c != nil && c.Request != nil {
  384. userAgent = c.Request.UserAgent()
  385. }
  386. for _, rule := range setting.Rules {
  387. if !matchAnyRegexCached(rule.ModelRegex, modelName) {
  388. continue
  389. }
  390. if len(rule.PathRegex) > 0 && !matchAnyRegexCached(rule.PathRegex, path) {
  391. continue
  392. }
  393. if len(rule.UserAgentInclude) > 0 && !matchAnyIncludeFold(rule.UserAgentInclude, userAgent) {
  394. continue
  395. }
  396. var affinityValue string
  397. var usedSource operation_setting.ChannelAffinityKeySource
  398. for _, src := range rule.KeySources {
  399. affinityValue = extractChannelAffinityValue(c, src)
  400. if affinityValue != "" {
  401. usedSource = src
  402. break
  403. }
  404. }
  405. if affinityValue == "" {
  406. continue
  407. }
  408. if rule.ValueRegex != "" && !matchAnyRegexCached([]string{rule.ValueRegex}, affinityValue) {
  409. continue
  410. }
  411. ttlSeconds := rule.TTLSeconds
  412. if ttlSeconds <= 0 {
  413. ttlSeconds = setting.DefaultTTLSeconds
  414. }
  415. cacheKeySuffix := buildChannelAffinityCacheKeySuffix(rule, usingGroup, affinityValue)
  416. cacheKeyFull := channelAffinityCacheNamespace + ":" + cacheKeySuffix
  417. setChannelAffinityContext(c, channelAffinityMeta{
  418. CacheKey: cacheKeyFull,
  419. TTLSeconds: ttlSeconds,
  420. RuleName: rule.Name,
  421. SkipRetry: rule.SkipRetryOnFailure,
  422. KeySourceType: strings.TrimSpace(usedSource.Type),
  423. KeySourceKey: strings.TrimSpace(usedSource.Key),
  424. KeySourcePath: strings.TrimSpace(usedSource.Path),
  425. KeyHint: buildChannelAffinityKeyHint(affinityValue),
  426. KeyFingerprint: affinityFingerprint(affinityValue),
  427. UsingGroup: usingGroup,
  428. ModelName: modelName,
  429. RequestPath: path,
  430. })
  431. cache := getChannelAffinityCache()
  432. channelID, found, err := cache.Get(cacheKeySuffix)
  433. if err != nil {
  434. common.SysError(fmt.Sprintf("channel affinity cache get failed: key=%s, err=%v", cacheKeyFull, err))
  435. return 0, false
  436. }
  437. if found {
  438. return channelID, true
  439. }
  440. return 0, false
  441. }
  442. return 0, false
  443. }
  444. func ShouldSkipRetryAfterChannelAffinityFailure(c *gin.Context) bool {
  445. if c == nil {
  446. return false
  447. }
  448. v, ok := c.Get(ginKeyChannelAffinitySkipRetry)
  449. if !ok {
  450. return false
  451. }
  452. b, ok := v.(bool)
  453. if !ok {
  454. return false
  455. }
  456. return b
  457. }
  458. func MarkChannelAffinityUsed(c *gin.Context, selectedGroup string, channelID int) {
  459. if c == nil || channelID <= 0 {
  460. return
  461. }
  462. meta, ok := getChannelAffinityMeta(c)
  463. if !ok {
  464. return
  465. }
  466. c.Set(ginKeyChannelAffinitySkipRetry, meta.SkipRetry)
  467. info := map[string]interface{}{
  468. "reason": meta.RuleName,
  469. "rule_name": meta.RuleName,
  470. "using_group": meta.UsingGroup,
  471. "selected_group": selectedGroup,
  472. "model": meta.ModelName,
  473. "request_path": meta.RequestPath,
  474. "channel_id": channelID,
  475. "key_source": meta.KeySourceType,
  476. "key_key": meta.KeySourceKey,
  477. "key_path": meta.KeySourcePath,
  478. "key_hint": meta.KeyHint,
  479. "key_fp": meta.KeyFingerprint,
  480. }
  481. c.Set(ginKeyChannelAffinityLogInfo, info)
  482. }
  483. func AppendChannelAffinityAdminInfo(c *gin.Context, adminInfo map[string]interface{}) {
  484. if c == nil || adminInfo == nil {
  485. return
  486. }
  487. anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo)
  488. if !ok || anyInfo == nil {
  489. return
  490. }
  491. adminInfo["channel_affinity"] = anyInfo
  492. }
  493. func RecordChannelAffinity(c *gin.Context, channelID int) {
  494. if channelID <= 0 {
  495. return
  496. }
  497. setting := operation_setting.GetChannelAffinitySetting()
  498. if setting == nil || !setting.Enabled {
  499. return
  500. }
  501. if setting.SwitchOnSuccess && c != nil {
  502. if successChannelID := c.GetInt("channel_id"); successChannelID > 0 {
  503. channelID = successChannelID
  504. }
  505. }
  506. cacheKey, ttlSeconds, ok := getChannelAffinityContext(c)
  507. if !ok {
  508. return
  509. }
  510. if ttlSeconds <= 0 {
  511. ttlSeconds = setting.DefaultTTLSeconds
  512. }
  513. if ttlSeconds <= 0 {
  514. ttlSeconds = 3600
  515. }
  516. cache := getChannelAffinityCache()
  517. if err := cache.SetWithTTL(cacheKey, channelID, time.Duration(ttlSeconds)*time.Second); err != nil {
  518. common.SysError(fmt.Sprintf("channel affinity cache set failed: key=%s, err=%v", cacheKey, err))
  519. }
  520. }
  521. type ChannelAffinityUsageCacheStats struct {
  522. RuleName string `json:"rule_name"`
  523. UsingGroup string `json:"using_group"`
  524. KeyFingerprint string `json:"key_fp"`
  525. Hit int64 `json:"hit"`
  526. Total int64 `json:"total"`
  527. WindowSeconds int64 `json:"window_seconds"`
  528. PromptTokens int64 `json:"prompt_tokens"`
  529. CompletionTokens int64 `json:"completion_tokens"`
  530. TotalTokens int64 `json:"total_tokens"`
  531. CachedTokens int64 `json:"cached_tokens"`
  532. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  533. LastSeenAt int64 `json:"last_seen_at"`
  534. }
  535. type ChannelAffinityUsageCacheCounters struct {
  536. Hit int64 `json:"hit"`
  537. Total int64 `json:"total"`
  538. WindowSeconds int64 `json:"window_seconds"`
  539. PromptTokens int64 `json:"prompt_tokens"`
  540. CompletionTokens int64 `json:"completion_tokens"`
  541. TotalTokens int64 `json:"total_tokens"`
  542. CachedTokens int64 `json:"cached_tokens"`
  543. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  544. LastSeenAt int64 `json:"last_seen_at"`
  545. }
  546. var channelAffinityUsageCacheStatsLocks [64]sync.Mutex
  547. func ObserveChannelAffinityUsageCacheFromContext(c *gin.Context, usage *dto.Usage) {
  548. statsCtx, ok := GetChannelAffinityStatsContext(c)
  549. if !ok {
  550. return
  551. }
  552. observeChannelAffinityUsageCache(statsCtx, usage)
  553. }
  554. func GetChannelAffinityUsageCacheStats(ruleName, usingGroup, keyFp string) ChannelAffinityUsageCacheStats {
  555. ruleName = strings.TrimSpace(ruleName)
  556. usingGroup = strings.TrimSpace(usingGroup)
  557. keyFp = strings.TrimSpace(keyFp)
  558. entryKey := channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp)
  559. if entryKey == "" {
  560. return ChannelAffinityUsageCacheStats{
  561. RuleName: ruleName,
  562. UsingGroup: usingGroup,
  563. KeyFingerprint: keyFp,
  564. }
  565. }
  566. cache := getChannelAffinityUsageCacheStatsCache()
  567. v, found, err := cache.Get(entryKey)
  568. if err != nil || !found {
  569. return ChannelAffinityUsageCacheStats{
  570. RuleName: ruleName,
  571. UsingGroup: usingGroup,
  572. KeyFingerprint: keyFp,
  573. }
  574. }
  575. return ChannelAffinityUsageCacheStats{
  576. RuleName: ruleName,
  577. UsingGroup: usingGroup,
  578. KeyFingerprint: keyFp,
  579. Hit: v.Hit,
  580. Total: v.Total,
  581. WindowSeconds: v.WindowSeconds,
  582. PromptTokens: v.PromptTokens,
  583. CompletionTokens: v.CompletionTokens,
  584. TotalTokens: v.TotalTokens,
  585. CachedTokens: v.CachedTokens,
  586. PromptCacheHitTokens: v.PromptCacheHitTokens,
  587. LastSeenAt: v.LastSeenAt,
  588. }
  589. }
  590. func observeChannelAffinityUsageCache(statsCtx ChannelAffinityStatsContext, usage *dto.Usage) {
  591. entryKey := channelAffinityUsageCacheEntryKey(statsCtx.RuleName, statsCtx.UsingGroup, statsCtx.KeyFingerprint)
  592. if entryKey == "" {
  593. return
  594. }
  595. windowSeconds := statsCtx.TTLSeconds
  596. if windowSeconds <= 0 {
  597. return
  598. }
  599. cache := getChannelAffinityUsageCacheStatsCache()
  600. ttl := time.Duration(windowSeconds) * time.Second
  601. lock := channelAffinityUsageCacheStatsLock(entryKey)
  602. lock.Lock()
  603. defer lock.Unlock()
  604. prev, found, err := cache.Get(entryKey)
  605. if err != nil {
  606. return
  607. }
  608. next := prev
  609. if !found {
  610. next = ChannelAffinityUsageCacheCounters{}
  611. }
  612. next.Total++
  613. hit, cachedTokens, promptCacheHitTokens := usageCacheSignals(usage)
  614. if hit {
  615. next.Hit++
  616. }
  617. next.WindowSeconds = windowSeconds
  618. next.LastSeenAt = time.Now().Unix()
  619. next.CachedTokens += cachedTokens
  620. next.PromptCacheHitTokens += promptCacheHitTokens
  621. next.PromptTokens += int64(usagePromptTokens(usage))
  622. next.CompletionTokens += int64(usageCompletionTokens(usage))
  623. next.TotalTokens += int64(usageTotalTokens(usage))
  624. _ = cache.SetWithTTL(entryKey, next, ttl)
  625. }
  626. func channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp string) string {
  627. ruleName = strings.TrimSpace(ruleName)
  628. usingGroup = strings.TrimSpace(usingGroup)
  629. keyFp = strings.TrimSpace(keyFp)
  630. if ruleName == "" || keyFp == "" {
  631. return ""
  632. }
  633. return ruleName + "\n" + usingGroup + "\n" + keyFp
  634. }
  635. func usageCacheSignals(usage *dto.Usage) (hit bool, cachedTokens int64, promptCacheHitTokens int64) {
  636. if usage == nil {
  637. return false, 0, 0
  638. }
  639. cached := int64(0)
  640. if usage.PromptTokensDetails.CachedTokens > 0 {
  641. cached = int64(usage.PromptTokensDetails.CachedTokens)
  642. } else if usage.InputTokensDetails != nil && usage.InputTokensDetails.CachedTokens > 0 {
  643. cached = int64(usage.InputTokensDetails.CachedTokens)
  644. }
  645. pcht := int64(0)
  646. if usage.PromptCacheHitTokens > 0 {
  647. pcht = int64(usage.PromptCacheHitTokens)
  648. }
  649. return cached > 0 || pcht > 0, cached, pcht
  650. }
  651. func usagePromptTokens(usage *dto.Usage) int {
  652. if usage == nil {
  653. return 0
  654. }
  655. if usage.PromptTokens > 0 {
  656. return usage.PromptTokens
  657. }
  658. return usage.InputTokens
  659. }
  660. func usageCompletionTokens(usage *dto.Usage) int {
  661. if usage == nil {
  662. return 0
  663. }
  664. if usage.CompletionTokens > 0 {
  665. return usage.CompletionTokens
  666. }
  667. return usage.OutputTokens
  668. }
  669. func usageTotalTokens(usage *dto.Usage) int {
  670. if usage == nil {
  671. return 0
  672. }
  673. if usage.TotalTokens > 0 {
  674. return usage.TotalTokens
  675. }
  676. pt := usagePromptTokens(usage)
  677. ct := usageCompletionTokens(usage)
  678. if pt > 0 || ct > 0 {
  679. return pt + ct
  680. }
  681. return 0
  682. }
  683. func getChannelAffinityUsageCacheStatsCache() *cachex.HybridCache[ChannelAffinityUsageCacheCounters] {
  684. channelAffinityUsageCacheStatsOnce.Do(func() {
  685. setting := operation_setting.GetChannelAffinitySetting()
  686. capacity := 100_000
  687. defaultTTLSeconds := 3600
  688. if setting != nil {
  689. if setting.MaxEntries > 0 {
  690. capacity = setting.MaxEntries
  691. }
  692. if setting.DefaultTTLSeconds > 0 {
  693. defaultTTLSeconds = setting.DefaultTTLSeconds
  694. }
  695. }
  696. channelAffinityUsageCacheStatsCache = cachex.NewHybridCache[ChannelAffinityUsageCacheCounters](cachex.HybridCacheConfig[ChannelAffinityUsageCacheCounters]{
  697. Namespace: cachex.Namespace(channelAffinityUsageCacheStatsNamespace),
  698. Redis: common.RDB,
  699. RedisEnabled: func() bool {
  700. return common.RedisEnabled && common.RDB != nil
  701. },
  702. RedisCodec: cachex.JSONCodec[ChannelAffinityUsageCacheCounters]{},
  703. Memory: func() *hot.HotCache[string, ChannelAffinityUsageCacheCounters] {
  704. return hot.NewHotCache[string, ChannelAffinityUsageCacheCounters](hot.LRU, capacity).
  705. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  706. WithJanitor().
  707. Build()
  708. },
  709. })
  710. })
  711. return channelAffinityUsageCacheStatsCache
  712. }
  713. func channelAffinityUsageCacheStatsLock(key string) *sync.Mutex {
  714. h := fnv.New32a()
  715. _, _ = h.Write([]byte(key))
  716. idx := h.Sum32() % uint32(len(channelAffinityUsageCacheStatsLocks))
  717. return &channelAffinityUsageCacheStatsLocks[idx]
  718. }