channel_affinity.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. package service
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/QuantumNous/new-api/common"
  11. "github.com/QuantumNous/new-api/dto"
  12. "github.com/QuantumNous/new-api/pkg/cachex"
  13. "github.com/QuantumNous/new-api/setting/operation_setting"
  14. "github.com/QuantumNous/new-api/types"
  15. "github.com/gin-gonic/gin"
  16. "github.com/samber/hot"
  17. "github.com/tidwall/gjson"
  18. )
  19. const (
  20. ginKeyChannelAffinityCacheKey = "channel_affinity_cache_key"
  21. ginKeyChannelAffinityTTLSeconds = "channel_affinity_ttl_seconds"
  22. ginKeyChannelAffinityMeta = "channel_affinity_meta"
  23. ginKeyChannelAffinityLogInfo = "channel_affinity_log_info"
  24. ginKeyChannelAffinitySkipRetry = "channel_affinity_skip_retry_on_failure"
  25. channelAffinityCacheNamespace = "new-api:channel_affinity:v1"
  26. channelAffinityUsageCacheStatsNamespace = "new-api:channel_affinity_usage_cache_stats:v1"
  27. )
  28. var (
  29. channelAffinityCacheOnce sync.Once
  30. channelAffinityCache *cachex.HybridCache[int]
  31. channelAffinityUsageCacheStatsOnce sync.Once
  32. channelAffinityUsageCacheStatsCache *cachex.HybridCache[ChannelAffinityUsageCacheCounters]
  33. channelAffinityRegexCache sync.Map // map[string]*regexp.Regexp
  34. )
  35. type channelAffinityMeta struct {
  36. CacheKey string
  37. TTLSeconds int
  38. RuleName string
  39. SkipRetry bool
  40. KeySourceType string
  41. KeySourceKey string
  42. KeySourcePath string
  43. KeyHint string
  44. KeyFingerprint string
  45. UsingGroup string
  46. ModelName string
  47. RequestPath string
  48. }
  49. type ChannelAffinityStatsContext struct {
  50. RuleName string
  51. UsingGroup string
  52. KeyFingerprint string
  53. TTLSeconds int64
  54. }
  55. const (
  56. cacheTokenRateModeCachedOverPrompt = "cached_over_prompt"
  57. cacheTokenRateModeCachedOverPromptPlusCached = "cached_over_prompt_plus_cached"
  58. cacheTokenRateModeMixed = "mixed"
  59. )
  60. type ChannelAffinityCacheStats struct {
  61. Enabled bool `json:"enabled"`
  62. Total int `json:"total"`
  63. Unknown int `json:"unknown"`
  64. ByRuleName map[string]int `json:"by_rule_name"`
  65. CacheCapacity int `json:"cache_capacity"`
  66. CacheAlgo string `json:"cache_algo"`
  67. }
  68. func getChannelAffinityCache() *cachex.HybridCache[int] {
  69. channelAffinityCacheOnce.Do(func() {
  70. setting := operation_setting.GetChannelAffinitySetting()
  71. capacity := setting.MaxEntries
  72. if capacity <= 0 {
  73. capacity = 100_000
  74. }
  75. defaultTTLSeconds := setting.DefaultTTLSeconds
  76. if defaultTTLSeconds <= 0 {
  77. defaultTTLSeconds = 3600
  78. }
  79. channelAffinityCache = cachex.NewHybridCache[int](cachex.HybridCacheConfig[int]{
  80. Namespace: cachex.Namespace(channelAffinityCacheNamespace),
  81. Redis: common.RDB,
  82. RedisEnabled: func() bool {
  83. return common.RedisEnabled && common.RDB != nil
  84. },
  85. RedisCodec: cachex.IntCodec{},
  86. Memory: func() *hot.HotCache[string, int] {
  87. return hot.NewHotCache[string, int](hot.LRU, capacity).
  88. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  89. WithJanitor().
  90. Build()
  91. },
  92. })
  93. })
  94. return channelAffinityCache
  95. }
  96. func GetChannelAffinityCacheStats() ChannelAffinityCacheStats {
  97. setting := operation_setting.GetChannelAffinitySetting()
  98. if setting == nil {
  99. return ChannelAffinityCacheStats{
  100. Enabled: false,
  101. Total: 0,
  102. Unknown: 0,
  103. ByRuleName: map[string]int{},
  104. }
  105. }
  106. cache := getChannelAffinityCache()
  107. mainCap, _ := cache.Capacity()
  108. mainAlgo, _ := cache.Algorithm()
  109. rules := setting.Rules
  110. ruleByName := make(map[string]operation_setting.ChannelAffinityRule, len(rules))
  111. for _, r := range rules {
  112. name := strings.TrimSpace(r.Name)
  113. if name == "" {
  114. continue
  115. }
  116. if !r.IncludeRuleName {
  117. continue
  118. }
  119. ruleByName[name] = r
  120. }
  121. byRuleName := make(map[string]int, len(ruleByName))
  122. for name := range ruleByName {
  123. byRuleName[name] = 0
  124. }
  125. keys, err := cache.Keys()
  126. if err != nil {
  127. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  128. keys = nil
  129. }
  130. total := len(keys)
  131. unknown := 0
  132. for _, k := range keys {
  133. prefix := channelAffinityCacheNamespace + ":"
  134. if !strings.HasPrefix(k, prefix) {
  135. unknown++
  136. continue
  137. }
  138. rest := strings.TrimPrefix(k, prefix)
  139. parts := strings.Split(rest, ":")
  140. if len(parts) < 2 {
  141. unknown++
  142. continue
  143. }
  144. ruleName := parts[0]
  145. rule, ok := ruleByName[ruleName]
  146. if !ok {
  147. unknown++
  148. continue
  149. }
  150. if rule.IncludeUsingGroup {
  151. if len(parts) < 3 {
  152. unknown++
  153. continue
  154. }
  155. }
  156. byRuleName[ruleName]++
  157. }
  158. return ChannelAffinityCacheStats{
  159. Enabled: setting.Enabled,
  160. Total: total,
  161. Unknown: unknown,
  162. ByRuleName: byRuleName,
  163. CacheCapacity: mainCap,
  164. CacheAlgo: mainAlgo,
  165. }
  166. }
  167. func ClearChannelAffinityCacheAll() int {
  168. cache := getChannelAffinityCache()
  169. keys, err := cache.Keys()
  170. if err != nil {
  171. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  172. keys = nil
  173. }
  174. if len(keys) > 0 {
  175. if _, err := cache.DeleteMany(keys); err != nil {
  176. common.SysError(fmt.Sprintf("channel affinity cache delete many failed: err=%v", err))
  177. }
  178. }
  179. return len(keys)
  180. }
  181. func ClearChannelAffinityCacheByRuleName(ruleName string) (int, error) {
  182. ruleName = strings.TrimSpace(ruleName)
  183. if ruleName == "" {
  184. return 0, fmt.Errorf("rule_name 不能为空")
  185. }
  186. setting := operation_setting.GetChannelAffinitySetting()
  187. if setting == nil {
  188. return 0, fmt.Errorf("channel_affinity_setting 未初始化")
  189. }
  190. var matchedRule *operation_setting.ChannelAffinityRule
  191. for i := range setting.Rules {
  192. r := &setting.Rules[i]
  193. if strings.TrimSpace(r.Name) != ruleName {
  194. continue
  195. }
  196. matchedRule = r
  197. break
  198. }
  199. if matchedRule == nil {
  200. return 0, fmt.Errorf("未知规则名称")
  201. }
  202. if !matchedRule.IncludeRuleName {
  203. return 0, fmt.Errorf("该规则未启用 include_rule_name,无法按规则清空缓存")
  204. }
  205. cache := getChannelAffinityCache()
  206. deleted, err := cache.DeleteByPrefix(ruleName)
  207. if err != nil {
  208. return 0, err
  209. }
  210. return deleted, nil
  211. }
  212. func matchAnyRegexCached(patterns []string, s string) bool {
  213. if len(patterns) == 0 || s == "" {
  214. return false
  215. }
  216. for _, pattern := range patterns {
  217. if pattern == "" {
  218. continue
  219. }
  220. re, ok := channelAffinityRegexCache.Load(pattern)
  221. if !ok {
  222. compiled, err := regexp.Compile(pattern)
  223. if err != nil {
  224. continue
  225. }
  226. re = compiled
  227. channelAffinityRegexCache.Store(pattern, re)
  228. }
  229. if re.(*regexp.Regexp).MatchString(s) {
  230. return true
  231. }
  232. }
  233. return false
  234. }
  235. func matchAnyIncludeFold(patterns []string, s string) bool {
  236. if len(patterns) == 0 || s == "" {
  237. return false
  238. }
  239. sLower := strings.ToLower(s)
  240. for _, p := range patterns {
  241. p = strings.TrimSpace(p)
  242. if p == "" {
  243. continue
  244. }
  245. if strings.Contains(sLower, strings.ToLower(p)) {
  246. return true
  247. }
  248. }
  249. return false
  250. }
  251. func extractChannelAffinityValue(c *gin.Context, src operation_setting.ChannelAffinityKeySource) string {
  252. switch src.Type {
  253. case "context_int":
  254. if src.Key == "" {
  255. return ""
  256. }
  257. v := c.GetInt(src.Key)
  258. if v <= 0 {
  259. return ""
  260. }
  261. return strconv.Itoa(v)
  262. case "context_string":
  263. if src.Key == "" {
  264. return ""
  265. }
  266. return strings.TrimSpace(c.GetString(src.Key))
  267. case "gjson":
  268. if src.Path == "" {
  269. return ""
  270. }
  271. storage, err := common.GetBodyStorage(c)
  272. if err != nil {
  273. return ""
  274. }
  275. body, err := storage.Bytes()
  276. if err != nil || len(body) == 0 {
  277. return ""
  278. }
  279. res := gjson.GetBytes(body, src.Path)
  280. if !res.Exists() {
  281. return ""
  282. }
  283. switch res.Type {
  284. case gjson.String, gjson.Number, gjson.True, gjson.False:
  285. return strings.TrimSpace(res.String())
  286. default:
  287. return strings.TrimSpace(res.Raw)
  288. }
  289. default:
  290. return ""
  291. }
  292. }
  293. func buildChannelAffinityCacheKeySuffix(rule operation_setting.ChannelAffinityRule, usingGroup string, affinityValue string) string {
  294. parts := make([]string, 0, 3)
  295. if rule.IncludeRuleName && rule.Name != "" {
  296. parts = append(parts, rule.Name)
  297. }
  298. if rule.IncludeUsingGroup && usingGroup != "" {
  299. parts = append(parts, usingGroup)
  300. }
  301. parts = append(parts, affinityValue)
  302. return strings.Join(parts, ":")
  303. }
  304. func setChannelAffinityContext(c *gin.Context, meta channelAffinityMeta) {
  305. c.Set(ginKeyChannelAffinityCacheKey, meta.CacheKey)
  306. c.Set(ginKeyChannelAffinityTTLSeconds, meta.TTLSeconds)
  307. c.Set(ginKeyChannelAffinityMeta, meta)
  308. }
  309. func getChannelAffinityContext(c *gin.Context) (string, int, bool) {
  310. keyAny, ok := c.Get(ginKeyChannelAffinityCacheKey)
  311. if !ok {
  312. return "", 0, false
  313. }
  314. key, ok := keyAny.(string)
  315. if !ok || key == "" {
  316. return "", 0, false
  317. }
  318. ttlAny, ok := c.Get(ginKeyChannelAffinityTTLSeconds)
  319. if !ok {
  320. return key, 0, true
  321. }
  322. ttlSeconds, _ := ttlAny.(int)
  323. return key, ttlSeconds, true
  324. }
  325. func getChannelAffinityMeta(c *gin.Context) (channelAffinityMeta, bool) {
  326. anyMeta, ok := c.Get(ginKeyChannelAffinityMeta)
  327. if !ok {
  328. return channelAffinityMeta{}, false
  329. }
  330. meta, ok := anyMeta.(channelAffinityMeta)
  331. if !ok {
  332. return channelAffinityMeta{}, false
  333. }
  334. return meta, true
  335. }
  336. func GetChannelAffinityStatsContext(c *gin.Context) (ChannelAffinityStatsContext, bool) {
  337. if c == nil {
  338. return ChannelAffinityStatsContext{}, false
  339. }
  340. meta, ok := getChannelAffinityMeta(c)
  341. if !ok {
  342. return ChannelAffinityStatsContext{}, false
  343. }
  344. ruleName := strings.TrimSpace(meta.RuleName)
  345. keyFp := strings.TrimSpace(meta.KeyFingerprint)
  346. usingGroup := strings.TrimSpace(meta.UsingGroup)
  347. if ruleName == "" || keyFp == "" {
  348. return ChannelAffinityStatsContext{}, false
  349. }
  350. ttlSeconds := int64(meta.TTLSeconds)
  351. if ttlSeconds <= 0 {
  352. return ChannelAffinityStatsContext{}, false
  353. }
  354. return ChannelAffinityStatsContext{
  355. RuleName: ruleName,
  356. UsingGroup: usingGroup,
  357. KeyFingerprint: keyFp,
  358. TTLSeconds: ttlSeconds,
  359. }, true
  360. }
  361. func affinityFingerprint(s string) string {
  362. if s == "" {
  363. return ""
  364. }
  365. hex := common.Sha1([]byte(s))
  366. if len(hex) >= 8 {
  367. return hex[:8]
  368. }
  369. return hex
  370. }
  371. func buildChannelAffinityKeyHint(s string) string {
  372. s = strings.TrimSpace(s)
  373. if s == "" {
  374. return ""
  375. }
  376. s = strings.ReplaceAll(s, "\n", " ")
  377. s = strings.ReplaceAll(s, "\r", " ")
  378. if len(s) <= 12 {
  379. return s
  380. }
  381. return s[:4] + "..." + s[len(s)-4:]
  382. }
  383. func GetPreferredChannelByAffinity(c *gin.Context, modelName string, usingGroup string) (int, bool) {
  384. setting := operation_setting.GetChannelAffinitySetting()
  385. if setting == nil || !setting.Enabled {
  386. return 0, false
  387. }
  388. path := ""
  389. if c != nil && c.Request != nil && c.Request.URL != nil {
  390. path = c.Request.URL.Path
  391. }
  392. userAgent := ""
  393. if c != nil && c.Request != nil {
  394. userAgent = c.Request.UserAgent()
  395. }
  396. for _, rule := range setting.Rules {
  397. if !matchAnyRegexCached(rule.ModelRegex, modelName) {
  398. continue
  399. }
  400. if len(rule.PathRegex) > 0 && !matchAnyRegexCached(rule.PathRegex, path) {
  401. continue
  402. }
  403. if len(rule.UserAgentInclude) > 0 && !matchAnyIncludeFold(rule.UserAgentInclude, userAgent) {
  404. continue
  405. }
  406. var affinityValue string
  407. var usedSource operation_setting.ChannelAffinityKeySource
  408. for _, src := range rule.KeySources {
  409. affinityValue = extractChannelAffinityValue(c, src)
  410. if affinityValue != "" {
  411. usedSource = src
  412. break
  413. }
  414. }
  415. if affinityValue == "" {
  416. continue
  417. }
  418. if rule.ValueRegex != "" && !matchAnyRegexCached([]string{rule.ValueRegex}, affinityValue) {
  419. continue
  420. }
  421. ttlSeconds := rule.TTLSeconds
  422. if ttlSeconds <= 0 {
  423. ttlSeconds = setting.DefaultTTLSeconds
  424. }
  425. cacheKeySuffix := buildChannelAffinityCacheKeySuffix(rule, usingGroup, affinityValue)
  426. cacheKeyFull := channelAffinityCacheNamespace + ":" + cacheKeySuffix
  427. setChannelAffinityContext(c, channelAffinityMeta{
  428. CacheKey: cacheKeyFull,
  429. TTLSeconds: ttlSeconds,
  430. RuleName: rule.Name,
  431. SkipRetry: rule.SkipRetryOnFailure,
  432. KeySourceType: strings.TrimSpace(usedSource.Type),
  433. KeySourceKey: strings.TrimSpace(usedSource.Key),
  434. KeySourcePath: strings.TrimSpace(usedSource.Path),
  435. KeyHint: buildChannelAffinityKeyHint(affinityValue),
  436. KeyFingerprint: affinityFingerprint(affinityValue),
  437. UsingGroup: usingGroup,
  438. ModelName: modelName,
  439. RequestPath: path,
  440. })
  441. cache := getChannelAffinityCache()
  442. channelID, found, err := cache.Get(cacheKeySuffix)
  443. if err != nil {
  444. common.SysError(fmt.Sprintf("channel affinity cache get failed: key=%s, err=%v", cacheKeyFull, err))
  445. return 0, false
  446. }
  447. if found {
  448. return channelID, true
  449. }
  450. return 0, false
  451. }
  452. return 0, false
  453. }
  454. func ShouldSkipRetryAfterChannelAffinityFailure(c *gin.Context) bool {
  455. if c == nil {
  456. return false
  457. }
  458. v, ok := c.Get(ginKeyChannelAffinitySkipRetry)
  459. if !ok {
  460. return false
  461. }
  462. b, ok := v.(bool)
  463. if !ok {
  464. return false
  465. }
  466. return b
  467. }
  468. func MarkChannelAffinityUsed(c *gin.Context, selectedGroup string, channelID int) {
  469. if c == nil || channelID <= 0 {
  470. return
  471. }
  472. meta, ok := getChannelAffinityMeta(c)
  473. if !ok {
  474. return
  475. }
  476. c.Set(ginKeyChannelAffinitySkipRetry, meta.SkipRetry)
  477. info := map[string]interface{}{
  478. "reason": meta.RuleName,
  479. "rule_name": meta.RuleName,
  480. "using_group": meta.UsingGroup,
  481. "selected_group": selectedGroup,
  482. "model": meta.ModelName,
  483. "request_path": meta.RequestPath,
  484. "channel_id": channelID,
  485. "key_source": meta.KeySourceType,
  486. "key_key": meta.KeySourceKey,
  487. "key_path": meta.KeySourcePath,
  488. "key_hint": meta.KeyHint,
  489. "key_fp": meta.KeyFingerprint,
  490. }
  491. c.Set(ginKeyChannelAffinityLogInfo, info)
  492. }
  493. func AppendChannelAffinityAdminInfo(c *gin.Context, adminInfo map[string]interface{}) {
  494. if c == nil || adminInfo == nil {
  495. return
  496. }
  497. anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo)
  498. if !ok || anyInfo == nil {
  499. return
  500. }
  501. adminInfo["channel_affinity"] = anyInfo
  502. }
  503. func RecordChannelAffinity(c *gin.Context, channelID int) {
  504. if channelID <= 0 {
  505. return
  506. }
  507. setting := operation_setting.GetChannelAffinitySetting()
  508. if setting == nil || !setting.Enabled {
  509. return
  510. }
  511. if setting.SwitchOnSuccess && c != nil {
  512. if successChannelID := c.GetInt("channel_id"); successChannelID > 0 {
  513. channelID = successChannelID
  514. }
  515. }
  516. cacheKey, ttlSeconds, ok := getChannelAffinityContext(c)
  517. if !ok {
  518. return
  519. }
  520. if ttlSeconds <= 0 {
  521. ttlSeconds = setting.DefaultTTLSeconds
  522. }
  523. if ttlSeconds <= 0 {
  524. ttlSeconds = 3600
  525. }
  526. cache := getChannelAffinityCache()
  527. if err := cache.SetWithTTL(cacheKey, channelID, time.Duration(ttlSeconds)*time.Second); err != nil {
  528. common.SysError(fmt.Sprintf("channel affinity cache set failed: key=%s, err=%v", cacheKey, err))
  529. }
  530. }
  531. type ChannelAffinityUsageCacheStats struct {
  532. RuleName string `json:"rule_name"`
  533. UsingGroup string `json:"using_group"`
  534. KeyFingerprint string `json:"key_fp"`
  535. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  536. Hit int64 `json:"hit"`
  537. Total int64 `json:"total"`
  538. WindowSeconds int64 `json:"window_seconds"`
  539. PromptTokens int64 `json:"prompt_tokens"`
  540. CompletionTokens int64 `json:"completion_tokens"`
  541. TotalTokens int64 `json:"total_tokens"`
  542. CachedTokens int64 `json:"cached_tokens"`
  543. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  544. LastSeenAt int64 `json:"last_seen_at"`
  545. }
  546. type ChannelAffinityUsageCacheCounters struct {
  547. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  548. Hit int64 `json:"hit"`
  549. Total int64 `json:"total"`
  550. WindowSeconds int64 `json:"window_seconds"`
  551. PromptTokens int64 `json:"prompt_tokens"`
  552. CompletionTokens int64 `json:"completion_tokens"`
  553. TotalTokens int64 `json:"total_tokens"`
  554. CachedTokens int64 `json:"cached_tokens"`
  555. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  556. LastSeenAt int64 `json:"last_seen_at"`
  557. }
  558. var channelAffinityUsageCacheStatsLocks [64]sync.Mutex
  559. // ObserveChannelAffinityUsageCacheByRelayFormat records usage cache stats with a stable rate mode derived from relay format.
  560. func ObserveChannelAffinityUsageCacheByRelayFormat(c *gin.Context, usage *dto.Usage, relayFormat types.RelayFormat) {
  561. ObserveChannelAffinityUsageCacheFromContext(c, usage, cachedTokenRateModeByRelayFormat(relayFormat))
  562. }
  563. func ObserveChannelAffinityUsageCacheFromContext(c *gin.Context, usage *dto.Usage, cachedTokenRateMode string) {
  564. statsCtx, ok := GetChannelAffinityStatsContext(c)
  565. if !ok {
  566. return
  567. }
  568. observeChannelAffinityUsageCache(statsCtx, usage, cachedTokenRateMode)
  569. }
  570. func GetChannelAffinityUsageCacheStats(ruleName, usingGroup, keyFp string) ChannelAffinityUsageCacheStats {
  571. ruleName = strings.TrimSpace(ruleName)
  572. usingGroup = strings.TrimSpace(usingGroup)
  573. keyFp = strings.TrimSpace(keyFp)
  574. entryKey := channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp)
  575. if entryKey == "" {
  576. return ChannelAffinityUsageCacheStats{
  577. RuleName: ruleName,
  578. UsingGroup: usingGroup,
  579. KeyFingerprint: keyFp,
  580. }
  581. }
  582. cache := getChannelAffinityUsageCacheStatsCache()
  583. v, found, err := cache.Get(entryKey)
  584. if err != nil || !found {
  585. return ChannelAffinityUsageCacheStats{
  586. RuleName: ruleName,
  587. UsingGroup: usingGroup,
  588. KeyFingerprint: keyFp,
  589. }
  590. }
  591. return ChannelAffinityUsageCacheStats{
  592. CachedTokenRateMode: v.CachedTokenRateMode,
  593. RuleName: ruleName,
  594. UsingGroup: usingGroup,
  595. KeyFingerprint: keyFp,
  596. Hit: v.Hit,
  597. Total: v.Total,
  598. WindowSeconds: v.WindowSeconds,
  599. PromptTokens: v.PromptTokens,
  600. CompletionTokens: v.CompletionTokens,
  601. TotalTokens: v.TotalTokens,
  602. CachedTokens: v.CachedTokens,
  603. PromptCacheHitTokens: v.PromptCacheHitTokens,
  604. LastSeenAt: v.LastSeenAt,
  605. }
  606. }
  607. func observeChannelAffinityUsageCache(statsCtx ChannelAffinityStatsContext, usage *dto.Usage, cachedTokenRateMode string) {
  608. entryKey := channelAffinityUsageCacheEntryKey(statsCtx.RuleName, statsCtx.UsingGroup, statsCtx.KeyFingerprint)
  609. if entryKey == "" {
  610. return
  611. }
  612. windowSeconds := statsCtx.TTLSeconds
  613. if windowSeconds <= 0 {
  614. return
  615. }
  616. cache := getChannelAffinityUsageCacheStatsCache()
  617. ttl := time.Duration(windowSeconds) * time.Second
  618. lock := channelAffinityUsageCacheStatsLock(entryKey)
  619. lock.Lock()
  620. defer lock.Unlock()
  621. prev, found, err := cache.Get(entryKey)
  622. if err != nil {
  623. return
  624. }
  625. next := prev
  626. if !found {
  627. next = ChannelAffinityUsageCacheCounters{}
  628. }
  629. currentMode := normalizeCachedTokenRateMode(cachedTokenRateMode)
  630. if currentMode != "" {
  631. if next.CachedTokenRateMode == "" {
  632. next.CachedTokenRateMode = currentMode
  633. } else if next.CachedTokenRateMode != currentMode && next.CachedTokenRateMode != cacheTokenRateModeMixed {
  634. next.CachedTokenRateMode = cacheTokenRateModeMixed
  635. }
  636. }
  637. next.Total++
  638. hit, cachedTokens, promptCacheHitTokens := usageCacheSignals(usage)
  639. if hit {
  640. next.Hit++
  641. }
  642. next.WindowSeconds = windowSeconds
  643. next.LastSeenAt = time.Now().Unix()
  644. next.CachedTokens += cachedTokens
  645. next.PromptCacheHitTokens += promptCacheHitTokens
  646. next.PromptTokens += int64(usagePromptTokens(usage))
  647. next.CompletionTokens += int64(usageCompletionTokens(usage))
  648. next.TotalTokens += int64(usageTotalTokens(usage))
  649. _ = cache.SetWithTTL(entryKey, next, ttl)
  650. }
  651. func normalizeCachedTokenRateMode(mode string) string {
  652. switch mode {
  653. case cacheTokenRateModeCachedOverPrompt:
  654. return cacheTokenRateModeCachedOverPrompt
  655. case cacheTokenRateModeCachedOverPromptPlusCached:
  656. return cacheTokenRateModeCachedOverPromptPlusCached
  657. case cacheTokenRateModeMixed:
  658. return cacheTokenRateModeMixed
  659. default:
  660. return ""
  661. }
  662. }
  663. func cachedTokenRateModeByRelayFormat(relayFormat types.RelayFormat) string {
  664. switch relayFormat {
  665. case types.RelayFormatOpenAI, types.RelayFormatOpenAIResponses, types.RelayFormatOpenAIResponsesCompaction:
  666. return cacheTokenRateModeCachedOverPrompt
  667. case types.RelayFormatClaude:
  668. return cacheTokenRateModeCachedOverPromptPlusCached
  669. default:
  670. return ""
  671. }
  672. }
  673. func channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp string) string {
  674. ruleName = strings.TrimSpace(ruleName)
  675. usingGroup = strings.TrimSpace(usingGroup)
  676. keyFp = strings.TrimSpace(keyFp)
  677. if ruleName == "" || keyFp == "" {
  678. return ""
  679. }
  680. return ruleName + "\n" + usingGroup + "\n" + keyFp
  681. }
  682. func usageCacheSignals(usage *dto.Usage) (hit bool, cachedTokens int64, promptCacheHitTokens int64) {
  683. if usage == nil {
  684. return false, 0, 0
  685. }
  686. cached := int64(0)
  687. if usage.PromptTokensDetails.CachedTokens > 0 {
  688. cached = int64(usage.PromptTokensDetails.CachedTokens)
  689. } else if usage.InputTokensDetails != nil && usage.InputTokensDetails.CachedTokens > 0 {
  690. cached = int64(usage.InputTokensDetails.CachedTokens)
  691. }
  692. pcht := int64(0)
  693. if usage.PromptCacheHitTokens > 0 {
  694. pcht = int64(usage.PromptCacheHitTokens)
  695. }
  696. return cached > 0 || pcht > 0, cached, pcht
  697. }
  698. func usagePromptTokens(usage *dto.Usage) int {
  699. if usage == nil {
  700. return 0
  701. }
  702. if usage.PromptTokens > 0 {
  703. return usage.PromptTokens
  704. }
  705. return usage.InputTokens
  706. }
  707. func usageCompletionTokens(usage *dto.Usage) int {
  708. if usage == nil {
  709. return 0
  710. }
  711. if usage.CompletionTokens > 0 {
  712. return usage.CompletionTokens
  713. }
  714. return usage.OutputTokens
  715. }
  716. func usageTotalTokens(usage *dto.Usage) int {
  717. if usage == nil {
  718. return 0
  719. }
  720. if usage.TotalTokens > 0 {
  721. return usage.TotalTokens
  722. }
  723. pt := usagePromptTokens(usage)
  724. ct := usageCompletionTokens(usage)
  725. if pt > 0 || ct > 0 {
  726. return pt + ct
  727. }
  728. return 0
  729. }
  730. func getChannelAffinityUsageCacheStatsCache() *cachex.HybridCache[ChannelAffinityUsageCacheCounters] {
  731. channelAffinityUsageCacheStatsOnce.Do(func() {
  732. setting := operation_setting.GetChannelAffinitySetting()
  733. capacity := 100_000
  734. defaultTTLSeconds := 3600
  735. if setting != nil {
  736. if setting.MaxEntries > 0 {
  737. capacity = setting.MaxEntries
  738. }
  739. if setting.DefaultTTLSeconds > 0 {
  740. defaultTTLSeconds = setting.DefaultTTLSeconds
  741. }
  742. }
  743. channelAffinityUsageCacheStatsCache = cachex.NewHybridCache[ChannelAffinityUsageCacheCounters](cachex.HybridCacheConfig[ChannelAffinityUsageCacheCounters]{
  744. Namespace: cachex.Namespace(channelAffinityUsageCacheStatsNamespace),
  745. Redis: common.RDB,
  746. RedisEnabled: func() bool {
  747. return common.RedisEnabled && common.RDB != nil
  748. },
  749. RedisCodec: cachex.JSONCodec[ChannelAffinityUsageCacheCounters]{},
  750. Memory: func() *hot.HotCache[string, ChannelAffinityUsageCacheCounters] {
  751. return hot.NewHotCache[string, ChannelAffinityUsageCacheCounters](hot.LRU, capacity).
  752. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  753. WithJanitor().
  754. Build()
  755. },
  756. })
  757. })
  758. return channelAffinityUsageCacheStatsCache
  759. }
  760. func channelAffinityUsageCacheStatsLock(key string) *sync.Mutex {
  761. h := fnv.New32a()
  762. _, _ = h.Write([]byte(key))
  763. idx := h.Sum32() % uint32(len(channelAffinityUsageCacheStatsLocks))
  764. return &channelAffinityUsageCacheStatsLocks[idx]
  765. }