channel_affinity.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915
  1. package service
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/QuantumNous/new-api/common"
  11. "github.com/QuantumNous/new-api/dto"
  12. "github.com/QuantumNous/new-api/pkg/cachex"
  13. "github.com/QuantumNous/new-api/setting/operation_setting"
  14. "github.com/QuantumNous/new-api/types"
  15. "github.com/gin-gonic/gin"
  16. "github.com/samber/hot"
  17. "github.com/tidwall/gjson"
  18. )
  19. const (
  20. ginKeyChannelAffinityCacheKey = "channel_affinity_cache_key"
  21. ginKeyChannelAffinityTTLSeconds = "channel_affinity_ttl_seconds"
  22. ginKeyChannelAffinityMeta = "channel_affinity_meta"
  23. ginKeyChannelAffinityLogInfo = "channel_affinity_log_info"
  24. ginKeyChannelAffinitySkipRetry = "channel_affinity_skip_retry_on_failure"
  25. channelAffinityCacheNamespace = "new-api:channel_affinity:v1"
  26. channelAffinityUsageCacheStatsNamespace = "new-api:channel_affinity_usage_cache_stats:v1"
  27. )
  28. var (
  29. channelAffinityCacheOnce sync.Once
  30. channelAffinityCache *cachex.HybridCache[int]
  31. channelAffinityUsageCacheStatsOnce sync.Once
  32. channelAffinityUsageCacheStatsCache *cachex.HybridCache[ChannelAffinityUsageCacheCounters]
  33. channelAffinityRegexCache sync.Map // map[string]*regexp.Regexp
  34. )
  35. type channelAffinityMeta struct {
  36. CacheKey string
  37. TTLSeconds int
  38. RuleName string
  39. SkipRetry bool
  40. ParamTemplate map[string]interface{}
  41. KeySourceType string
  42. KeySourceKey string
  43. KeySourcePath string
  44. KeyHint string
  45. KeyFingerprint string
  46. UsingGroup string
  47. ModelName string
  48. RequestPath string
  49. }
  50. type ChannelAffinityStatsContext struct {
  51. RuleName string
  52. UsingGroup string
  53. KeyFingerprint string
  54. TTLSeconds int64
  55. }
  56. const (
  57. cacheTokenRateModeCachedOverPrompt = "cached_over_prompt"
  58. cacheTokenRateModeCachedOverPromptPlusCached = "cached_over_prompt_plus_cached"
  59. cacheTokenRateModeMixed = "mixed"
  60. )
  61. type ChannelAffinityCacheStats struct {
  62. Enabled bool `json:"enabled"`
  63. Total int `json:"total"`
  64. Unknown int `json:"unknown"`
  65. ByRuleName map[string]int `json:"by_rule_name"`
  66. CacheCapacity int `json:"cache_capacity"`
  67. CacheAlgo string `json:"cache_algo"`
  68. }
  69. func getChannelAffinityCache() *cachex.HybridCache[int] {
  70. channelAffinityCacheOnce.Do(func() {
  71. setting := operation_setting.GetChannelAffinitySetting()
  72. capacity := setting.MaxEntries
  73. if capacity <= 0 {
  74. capacity = 100_000
  75. }
  76. defaultTTLSeconds := setting.DefaultTTLSeconds
  77. if defaultTTLSeconds <= 0 {
  78. defaultTTLSeconds = 3600
  79. }
  80. channelAffinityCache = cachex.NewHybridCache[int](cachex.HybridCacheConfig[int]{
  81. Namespace: cachex.Namespace(channelAffinityCacheNamespace),
  82. Redis: common.RDB,
  83. RedisEnabled: func() bool {
  84. return common.RedisEnabled && common.RDB != nil
  85. },
  86. RedisCodec: cachex.IntCodec{},
  87. Memory: func() *hot.HotCache[string, int] {
  88. return hot.NewHotCache[string, int](hot.LRU, capacity).
  89. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  90. WithJanitor().
  91. Build()
  92. },
  93. })
  94. })
  95. return channelAffinityCache
  96. }
  97. func GetChannelAffinityCacheStats() ChannelAffinityCacheStats {
  98. setting := operation_setting.GetChannelAffinitySetting()
  99. if setting == nil {
  100. return ChannelAffinityCacheStats{
  101. Enabled: false,
  102. Total: 0,
  103. Unknown: 0,
  104. ByRuleName: map[string]int{},
  105. }
  106. }
  107. cache := getChannelAffinityCache()
  108. mainCap, _ := cache.Capacity()
  109. mainAlgo, _ := cache.Algorithm()
  110. rules := setting.Rules
  111. ruleByName := make(map[string]operation_setting.ChannelAffinityRule, len(rules))
  112. for _, r := range rules {
  113. name := strings.TrimSpace(r.Name)
  114. if name == "" {
  115. continue
  116. }
  117. if !r.IncludeRuleName {
  118. continue
  119. }
  120. ruleByName[name] = r
  121. }
  122. byRuleName := make(map[string]int, len(ruleByName))
  123. for name := range ruleByName {
  124. byRuleName[name] = 0
  125. }
  126. keys, err := cache.Keys()
  127. if err != nil {
  128. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  129. keys = nil
  130. }
  131. total := len(keys)
  132. unknown := 0
  133. for _, k := range keys {
  134. prefix := channelAffinityCacheNamespace + ":"
  135. if !strings.HasPrefix(k, prefix) {
  136. unknown++
  137. continue
  138. }
  139. rest := strings.TrimPrefix(k, prefix)
  140. parts := strings.Split(rest, ":")
  141. if len(parts) < 2 {
  142. unknown++
  143. continue
  144. }
  145. ruleName := parts[0]
  146. rule, ok := ruleByName[ruleName]
  147. if !ok {
  148. unknown++
  149. continue
  150. }
  151. if rule.IncludeUsingGroup {
  152. if len(parts) < 3 {
  153. unknown++
  154. continue
  155. }
  156. }
  157. byRuleName[ruleName]++
  158. }
  159. return ChannelAffinityCacheStats{
  160. Enabled: setting.Enabled,
  161. Total: total,
  162. Unknown: unknown,
  163. ByRuleName: byRuleName,
  164. CacheCapacity: mainCap,
  165. CacheAlgo: mainAlgo,
  166. }
  167. }
  168. func ClearChannelAffinityCacheAll() int {
  169. cache := getChannelAffinityCache()
  170. keys, err := cache.Keys()
  171. if err != nil {
  172. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  173. keys = nil
  174. }
  175. if len(keys) > 0 {
  176. if _, err := cache.DeleteMany(keys); err != nil {
  177. common.SysError(fmt.Sprintf("channel affinity cache delete many failed: err=%v", err))
  178. }
  179. }
  180. return len(keys)
  181. }
  182. func ClearChannelAffinityCacheByRuleName(ruleName string) (int, error) {
  183. ruleName = strings.TrimSpace(ruleName)
  184. if ruleName == "" {
  185. return 0, fmt.Errorf("rule_name 不能为空")
  186. }
  187. setting := operation_setting.GetChannelAffinitySetting()
  188. if setting == nil {
  189. return 0, fmt.Errorf("channel_affinity_setting 未初始化")
  190. }
  191. var matchedRule *operation_setting.ChannelAffinityRule
  192. for i := range setting.Rules {
  193. r := &setting.Rules[i]
  194. if strings.TrimSpace(r.Name) != ruleName {
  195. continue
  196. }
  197. matchedRule = r
  198. break
  199. }
  200. if matchedRule == nil {
  201. return 0, fmt.Errorf("未知规则名称")
  202. }
  203. if !matchedRule.IncludeRuleName {
  204. return 0, fmt.Errorf("该规则未启用 include_rule_name,无法按规则清空缓存")
  205. }
  206. cache := getChannelAffinityCache()
  207. deleted, err := cache.DeleteByPrefix(ruleName)
  208. if err != nil {
  209. return 0, err
  210. }
  211. return deleted, nil
  212. }
  213. func matchAnyRegexCached(patterns []string, s string) bool {
  214. if len(patterns) == 0 || s == "" {
  215. return false
  216. }
  217. for _, pattern := range patterns {
  218. if pattern == "" {
  219. continue
  220. }
  221. re, ok := channelAffinityRegexCache.Load(pattern)
  222. if !ok {
  223. compiled, err := regexp.Compile(pattern)
  224. if err != nil {
  225. continue
  226. }
  227. re = compiled
  228. channelAffinityRegexCache.Store(pattern, re)
  229. }
  230. if re.(*regexp.Regexp).MatchString(s) {
  231. return true
  232. }
  233. }
  234. return false
  235. }
  236. func matchAnyIncludeFold(patterns []string, s string) bool {
  237. if len(patterns) == 0 || s == "" {
  238. return false
  239. }
  240. sLower := strings.ToLower(s)
  241. for _, p := range patterns {
  242. p = strings.TrimSpace(p)
  243. if p == "" {
  244. continue
  245. }
  246. if strings.Contains(sLower, strings.ToLower(p)) {
  247. return true
  248. }
  249. }
  250. return false
  251. }
  252. func extractChannelAffinityValue(c *gin.Context, src operation_setting.ChannelAffinityKeySource) string {
  253. switch src.Type {
  254. case "context_int":
  255. if src.Key == "" {
  256. return ""
  257. }
  258. v := c.GetInt(src.Key)
  259. if v <= 0 {
  260. return ""
  261. }
  262. return strconv.Itoa(v)
  263. case "context_string":
  264. if src.Key == "" {
  265. return ""
  266. }
  267. return strings.TrimSpace(c.GetString(src.Key))
  268. case "gjson":
  269. if src.Path == "" {
  270. return ""
  271. }
  272. storage, err := common.GetBodyStorage(c)
  273. if err != nil {
  274. return ""
  275. }
  276. body, err := storage.Bytes()
  277. if err != nil || len(body) == 0 {
  278. return ""
  279. }
  280. res := gjson.GetBytes(body, src.Path)
  281. if !res.Exists() {
  282. return ""
  283. }
  284. switch res.Type {
  285. case gjson.String, gjson.Number, gjson.True, gjson.False:
  286. return strings.TrimSpace(res.String())
  287. default:
  288. return strings.TrimSpace(res.Raw)
  289. }
  290. default:
  291. return ""
  292. }
  293. }
  294. func buildChannelAffinityCacheKeySuffix(rule operation_setting.ChannelAffinityRule, usingGroup string, affinityValue string) string {
  295. parts := make([]string, 0, 3)
  296. if rule.IncludeRuleName && rule.Name != "" {
  297. parts = append(parts, rule.Name)
  298. }
  299. if rule.IncludeUsingGroup && usingGroup != "" {
  300. parts = append(parts, usingGroup)
  301. }
  302. parts = append(parts, affinityValue)
  303. return strings.Join(parts, ":")
  304. }
  305. func setChannelAffinityContext(c *gin.Context, meta channelAffinityMeta) {
  306. c.Set(ginKeyChannelAffinityCacheKey, meta.CacheKey)
  307. c.Set(ginKeyChannelAffinityTTLSeconds, meta.TTLSeconds)
  308. c.Set(ginKeyChannelAffinityMeta, meta)
  309. }
  310. func getChannelAffinityContext(c *gin.Context) (string, int, bool) {
  311. keyAny, ok := c.Get(ginKeyChannelAffinityCacheKey)
  312. if !ok {
  313. return "", 0, false
  314. }
  315. key, ok := keyAny.(string)
  316. if !ok || key == "" {
  317. return "", 0, false
  318. }
  319. ttlAny, ok := c.Get(ginKeyChannelAffinityTTLSeconds)
  320. if !ok {
  321. return key, 0, true
  322. }
  323. ttlSeconds, _ := ttlAny.(int)
  324. return key, ttlSeconds, true
  325. }
  326. func getChannelAffinityMeta(c *gin.Context) (channelAffinityMeta, bool) {
  327. anyMeta, ok := c.Get(ginKeyChannelAffinityMeta)
  328. if !ok {
  329. return channelAffinityMeta{}, false
  330. }
  331. meta, ok := anyMeta.(channelAffinityMeta)
  332. if !ok {
  333. return channelAffinityMeta{}, false
  334. }
  335. return meta, true
  336. }
  337. func GetChannelAffinityStatsContext(c *gin.Context) (ChannelAffinityStatsContext, bool) {
  338. if c == nil {
  339. return ChannelAffinityStatsContext{}, false
  340. }
  341. meta, ok := getChannelAffinityMeta(c)
  342. if !ok {
  343. return ChannelAffinityStatsContext{}, false
  344. }
  345. ruleName := strings.TrimSpace(meta.RuleName)
  346. keyFp := strings.TrimSpace(meta.KeyFingerprint)
  347. usingGroup := strings.TrimSpace(meta.UsingGroup)
  348. if ruleName == "" || keyFp == "" {
  349. return ChannelAffinityStatsContext{}, false
  350. }
  351. ttlSeconds := int64(meta.TTLSeconds)
  352. if ttlSeconds <= 0 {
  353. return ChannelAffinityStatsContext{}, false
  354. }
  355. return ChannelAffinityStatsContext{
  356. RuleName: ruleName,
  357. UsingGroup: usingGroup,
  358. KeyFingerprint: keyFp,
  359. TTLSeconds: ttlSeconds,
  360. }, true
  361. }
  362. func affinityFingerprint(s string) string {
  363. if s == "" {
  364. return ""
  365. }
  366. hex := common.Sha1([]byte(s))
  367. if len(hex) >= 8 {
  368. return hex[:8]
  369. }
  370. return hex
  371. }
  372. func buildChannelAffinityKeyHint(s string) string {
  373. s = strings.TrimSpace(s)
  374. if s == "" {
  375. return ""
  376. }
  377. s = strings.ReplaceAll(s, "\n", " ")
  378. s = strings.ReplaceAll(s, "\r", " ")
  379. if len(s) <= 12 {
  380. return s
  381. }
  382. return s[:4] + "..." + s[len(s)-4:]
  383. }
  384. func cloneStringAnyMap(src map[string]interface{}) map[string]interface{} {
  385. if len(src) == 0 {
  386. return map[string]interface{}{}
  387. }
  388. dst := make(map[string]interface{}, len(src))
  389. for k, v := range src {
  390. dst[k] = v
  391. }
  392. return dst
  393. }
  394. func mergeChannelOverride(base map[string]interface{}, tpl map[string]interface{}) map[string]interface{} {
  395. if len(base) == 0 && len(tpl) == 0 {
  396. return map[string]interface{}{}
  397. }
  398. if len(tpl) == 0 {
  399. return base
  400. }
  401. out := cloneStringAnyMap(base)
  402. for k, v := range tpl {
  403. out[k] = v
  404. }
  405. return out
  406. }
  407. func appendChannelAffinityTemplateAdminInfo(c *gin.Context, meta channelAffinityMeta) {
  408. if c == nil {
  409. return
  410. }
  411. if len(meta.ParamTemplate) == 0 {
  412. return
  413. }
  414. templateInfo := map[string]interface{}{
  415. "applied": true,
  416. "rule_name": meta.RuleName,
  417. "param_override_keys": len(meta.ParamTemplate),
  418. }
  419. if anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo); ok {
  420. if info, ok := anyInfo.(map[string]interface{}); ok {
  421. info["override_template"] = templateInfo
  422. c.Set(ginKeyChannelAffinityLogInfo, info)
  423. return
  424. }
  425. }
  426. c.Set(ginKeyChannelAffinityLogInfo, map[string]interface{}{
  427. "reason": meta.RuleName,
  428. "rule_name": meta.RuleName,
  429. "using_group": meta.UsingGroup,
  430. "model": meta.ModelName,
  431. "request_path": meta.RequestPath,
  432. "key_source": meta.KeySourceType,
  433. "key_key": meta.KeySourceKey,
  434. "key_path": meta.KeySourcePath,
  435. "key_hint": meta.KeyHint,
  436. "key_fp": meta.KeyFingerprint,
  437. "override_template": templateInfo,
  438. })
  439. }
  440. // ApplyChannelAffinityOverrideTemplate merges per-rule channel override templates onto the selected channel override config.
  441. func ApplyChannelAffinityOverrideTemplate(c *gin.Context, paramOverride map[string]interface{}) (map[string]interface{}, bool) {
  442. if c == nil {
  443. return paramOverride, false
  444. }
  445. meta, ok := getChannelAffinityMeta(c)
  446. if !ok {
  447. return paramOverride, false
  448. }
  449. if len(meta.ParamTemplate) == 0 {
  450. return paramOverride, false
  451. }
  452. mergedParam := mergeChannelOverride(paramOverride, meta.ParamTemplate)
  453. appendChannelAffinityTemplateAdminInfo(c, meta)
  454. return mergedParam, true
  455. }
  456. func GetPreferredChannelByAffinity(c *gin.Context, modelName string, usingGroup string) (int, bool) {
  457. setting := operation_setting.GetChannelAffinitySetting()
  458. if setting == nil || !setting.Enabled {
  459. return 0, false
  460. }
  461. path := ""
  462. if c != nil && c.Request != nil && c.Request.URL != nil {
  463. path = c.Request.URL.Path
  464. }
  465. userAgent := ""
  466. if c != nil && c.Request != nil {
  467. userAgent = c.Request.UserAgent()
  468. }
  469. for _, rule := range setting.Rules {
  470. if !matchAnyRegexCached(rule.ModelRegex, modelName) {
  471. continue
  472. }
  473. if len(rule.PathRegex) > 0 && !matchAnyRegexCached(rule.PathRegex, path) {
  474. continue
  475. }
  476. if len(rule.UserAgentInclude) > 0 && !matchAnyIncludeFold(rule.UserAgentInclude, userAgent) {
  477. continue
  478. }
  479. var affinityValue string
  480. var usedSource operation_setting.ChannelAffinityKeySource
  481. for _, src := range rule.KeySources {
  482. affinityValue = extractChannelAffinityValue(c, src)
  483. if affinityValue != "" {
  484. usedSource = src
  485. break
  486. }
  487. }
  488. if affinityValue == "" {
  489. continue
  490. }
  491. if rule.ValueRegex != "" && !matchAnyRegexCached([]string{rule.ValueRegex}, affinityValue) {
  492. continue
  493. }
  494. ttlSeconds := rule.TTLSeconds
  495. if ttlSeconds <= 0 {
  496. ttlSeconds = setting.DefaultTTLSeconds
  497. }
  498. cacheKeySuffix := buildChannelAffinityCacheKeySuffix(rule, usingGroup, affinityValue)
  499. cacheKeyFull := channelAffinityCacheNamespace + ":" + cacheKeySuffix
  500. setChannelAffinityContext(c, channelAffinityMeta{
  501. CacheKey: cacheKeyFull,
  502. TTLSeconds: ttlSeconds,
  503. RuleName: rule.Name,
  504. SkipRetry: rule.SkipRetryOnFailure,
  505. ParamTemplate: cloneStringAnyMap(rule.ParamOverrideTemplate),
  506. KeySourceType: strings.TrimSpace(usedSource.Type),
  507. KeySourceKey: strings.TrimSpace(usedSource.Key),
  508. KeySourcePath: strings.TrimSpace(usedSource.Path),
  509. KeyHint: buildChannelAffinityKeyHint(affinityValue),
  510. KeyFingerprint: affinityFingerprint(affinityValue),
  511. UsingGroup: usingGroup,
  512. ModelName: modelName,
  513. RequestPath: path,
  514. })
  515. cache := getChannelAffinityCache()
  516. channelID, found, err := cache.Get(cacheKeySuffix)
  517. if err != nil {
  518. common.SysError(fmt.Sprintf("channel affinity cache get failed: key=%s, err=%v", cacheKeyFull, err))
  519. return 0, false
  520. }
  521. if found {
  522. return channelID, true
  523. }
  524. return 0, false
  525. }
  526. return 0, false
  527. }
  528. func ShouldSkipRetryAfterChannelAffinityFailure(c *gin.Context) bool {
  529. if c == nil {
  530. return false
  531. }
  532. v, ok := c.Get(ginKeyChannelAffinitySkipRetry)
  533. if !ok {
  534. return false
  535. }
  536. b, ok := v.(bool)
  537. if !ok {
  538. return false
  539. }
  540. return b
  541. }
  542. func MarkChannelAffinityUsed(c *gin.Context, selectedGroup string, channelID int) {
  543. if c == nil || channelID <= 0 {
  544. return
  545. }
  546. meta, ok := getChannelAffinityMeta(c)
  547. if !ok {
  548. return
  549. }
  550. c.Set(ginKeyChannelAffinitySkipRetry, meta.SkipRetry)
  551. info := map[string]interface{}{
  552. "reason": meta.RuleName,
  553. "rule_name": meta.RuleName,
  554. "using_group": meta.UsingGroup,
  555. "selected_group": selectedGroup,
  556. "model": meta.ModelName,
  557. "request_path": meta.RequestPath,
  558. "channel_id": channelID,
  559. "key_source": meta.KeySourceType,
  560. "key_key": meta.KeySourceKey,
  561. "key_path": meta.KeySourcePath,
  562. "key_hint": meta.KeyHint,
  563. "key_fp": meta.KeyFingerprint,
  564. }
  565. c.Set(ginKeyChannelAffinityLogInfo, info)
  566. }
  567. func AppendChannelAffinityAdminInfo(c *gin.Context, adminInfo map[string]interface{}) {
  568. if c == nil || adminInfo == nil {
  569. return
  570. }
  571. anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo)
  572. if !ok || anyInfo == nil {
  573. return
  574. }
  575. adminInfo["channel_affinity"] = anyInfo
  576. }
  577. func RecordChannelAffinity(c *gin.Context, channelID int) {
  578. if channelID <= 0 {
  579. return
  580. }
  581. setting := operation_setting.GetChannelAffinitySetting()
  582. if setting == nil || !setting.Enabled {
  583. return
  584. }
  585. if setting.SwitchOnSuccess && c != nil {
  586. if successChannelID := c.GetInt("channel_id"); successChannelID > 0 {
  587. channelID = successChannelID
  588. }
  589. }
  590. cacheKey, ttlSeconds, ok := getChannelAffinityContext(c)
  591. if !ok {
  592. return
  593. }
  594. if ttlSeconds <= 0 {
  595. ttlSeconds = setting.DefaultTTLSeconds
  596. }
  597. if ttlSeconds <= 0 {
  598. ttlSeconds = 3600
  599. }
  600. cache := getChannelAffinityCache()
  601. if err := cache.SetWithTTL(cacheKey, channelID, time.Duration(ttlSeconds)*time.Second); err != nil {
  602. common.SysError(fmt.Sprintf("channel affinity cache set failed: key=%s, err=%v", cacheKey, err))
  603. }
  604. }
  605. type ChannelAffinityUsageCacheStats struct {
  606. RuleName string `json:"rule_name"`
  607. UsingGroup string `json:"using_group"`
  608. KeyFingerprint string `json:"key_fp"`
  609. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  610. Hit int64 `json:"hit"`
  611. Total int64 `json:"total"`
  612. WindowSeconds int64 `json:"window_seconds"`
  613. PromptTokens int64 `json:"prompt_tokens"`
  614. CompletionTokens int64 `json:"completion_tokens"`
  615. TotalTokens int64 `json:"total_tokens"`
  616. CachedTokens int64 `json:"cached_tokens"`
  617. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  618. LastSeenAt int64 `json:"last_seen_at"`
  619. }
  620. type ChannelAffinityUsageCacheCounters struct {
  621. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  622. Hit int64 `json:"hit"`
  623. Total int64 `json:"total"`
  624. WindowSeconds int64 `json:"window_seconds"`
  625. PromptTokens int64 `json:"prompt_tokens"`
  626. CompletionTokens int64 `json:"completion_tokens"`
  627. TotalTokens int64 `json:"total_tokens"`
  628. CachedTokens int64 `json:"cached_tokens"`
  629. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  630. LastSeenAt int64 `json:"last_seen_at"`
  631. }
  632. var channelAffinityUsageCacheStatsLocks [64]sync.Mutex
  633. // ObserveChannelAffinityUsageCacheByRelayFormat records usage cache stats with a stable rate mode derived from relay format.
  634. func ObserveChannelAffinityUsageCacheByRelayFormat(c *gin.Context, usage *dto.Usage, relayFormat types.RelayFormat) {
  635. ObserveChannelAffinityUsageCacheFromContext(c, usage, cachedTokenRateModeByRelayFormat(relayFormat))
  636. }
  637. func ObserveChannelAffinityUsageCacheFromContext(c *gin.Context, usage *dto.Usage, cachedTokenRateMode string) {
  638. statsCtx, ok := GetChannelAffinityStatsContext(c)
  639. if !ok {
  640. return
  641. }
  642. observeChannelAffinityUsageCache(statsCtx, usage, cachedTokenRateMode)
  643. }
  644. func GetChannelAffinityUsageCacheStats(ruleName, usingGroup, keyFp string) ChannelAffinityUsageCacheStats {
  645. ruleName = strings.TrimSpace(ruleName)
  646. usingGroup = strings.TrimSpace(usingGroup)
  647. keyFp = strings.TrimSpace(keyFp)
  648. entryKey := channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp)
  649. if entryKey == "" {
  650. return ChannelAffinityUsageCacheStats{
  651. RuleName: ruleName,
  652. UsingGroup: usingGroup,
  653. KeyFingerprint: keyFp,
  654. }
  655. }
  656. cache := getChannelAffinityUsageCacheStatsCache()
  657. v, found, err := cache.Get(entryKey)
  658. if err != nil || !found {
  659. return ChannelAffinityUsageCacheStats{
  660. RuleName: ruleName,
  661. UsingGroup: usingGroup,
  662. KeyFingerprint: keyFp,
  663. }
  664. }
  665. return ChannelAffinityUsageCacheStats{
  666. CachedTokenRateMode: v.CachedTokenRateMode,
  667. RuleName: ruleName,
  668. UsingGroup: usingGroup,
  669. KeyFingerprint: keyFp,
  670. Hit: v.Hit,
  671. Total: v.Total,
  672. WindowSeconds: v.WindowSeconds,
  673. PromptTokens: v.PromptTokens,
  674. CompletionTokens: v.CompletionTokens,
  675. TotalTokens: v.TotalTokens,
  676. CachedTokens: v.CachedTokens,
  677. PromptCacheHitTokens: v.PromptCacheHitTokens,
  678. LastSeenAt: v.LastSeenAt,
  679. }
  680. }
  681. func observeChannelAffinityUsageCache(statsCtx ChannelAffinityStatsContext, usage *dto.Usage, cachedTokenRateMode string) {
  682. entryKey := channelAffinityUsageCacheEntryKey(statsCtx.RuleName, statsCtx.UsingGroup, statsCtx.KeyFingerprint)
  683. if entryKey == "" {
  684. return
  685. }
  686. windowSeconds := statsCtx.TTLSeconds
  687. if windowSeconds <= 0 {
  688. return
  689. }
  690. cache := getChannelAffinityUsageCacheStatsCache()
  691. ttl := time.Duration(windowSeconds) * time.Second
  692. lock := channelAffinityUsageCacheStatsLock(entryKey)
  693. lock.Lock()
  694. defer lock.Unlock()
  695. prev, found, err := cache.Get(entryKey)
  696. if err != nil {
  697. return
  698. }
  699. next := prev
  700. if !found {
  701. next = ChannelAffinityUsageCacheCounters{}
  702. }
  703. currentMode := normalizeCachedTokenRateMode(cachedTokenRateMode)
  704. if currentMode != "" {
  705. if next.CachedTokenRateMode == "" {
  706. next.CachedTokenRateMode = currentMode
  707. } else if next.CachedTokenRateMode != currentMode && next.CachedTokenRateMode != cacheTokenRateModeMixed {
  708. next.CachedTokenRateMode = cacheTokenRateModeMixed
  709. }
  710. }
  711. next.Total++
  712. hit, cachedTokens, promptCacheHitTokens := usageCacheSignals(usage)
  713. if hit {
  714. next.Hit++
  715. }
  716. next.WindowSeconds = windowSeconds
  717. next.LastSeenAt = time.Now().Unix()
  718. next.CachedTokens += cachedTokens
  719. next.PromptCacheHitTokens += promptCacheHitTokens
  720. next.PromptTokens += int64(usagePromptTokens(usage))
  721. next.CompletionTokens += int64(usageCompletionTokens(usage))
  722. next.TotalTokens += int64(usageTotalTokens(usage))
  723. _ = cache.SetWithTTL(entryKey, next, ttl)
  724. }
  725. func normalizeCachedTokenRateMode(mode string) string {
  726. switch mode {
  727. case cacheTokenRateModeCachedOverPrompt:
  728. return cacheTokenRateModeCachedOverPrompt
  729. case cacheTokenRateModeCachedOverPromptPlusCached:
  730. return cacheTokenRateModeCachedOverPromptPlusCached
  731. case cacheTokenRateModeMixed:
  732. return cacheTokenRateModeMixed
  733. default:
  734. return ""
  735. }
  736. }
  737. func cachedTokenRateModeByRelayFormat(relayFormat types.RelayFormat) string {
  738. switch relayFormat {
  739. case types.RelayFormatOpenAI, types.RelayFormatOpenAIResponses, types.RelayFormatOpenAIResponsesCompaction:
  740. return cacheTokenRateModeCachedOverPrompt
  741. case types.RelayFormatClaude:
  742. return cacheTokenRateModeCachedOverPromptPlusCached
  743. default:
  744. return ""
  745. }
  746. }
  747. func channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp string) string {
  748. ruleName = strings.TrimSpace(ruleName)
  749. usingGroup = strings.TrimSpace(usingGroup)
  750. keyFp = strings.TrimSpace(keyFp)
  751. if ruleName == "" || keyFp == "" {
  752. return ""
  753. }
  754. return ruleName + "\n" + usingGroup + "\n" + keyFp
  755. }
  756. func usageCacheSignals(usage *dto.Usage) (hit bool, cachedTokens int64, promptCacheHitTokens int64) {
  757. if usage == nil {
  758. return false, 0, 0
  759. }
  760. cached := int64(0)
  761. if usage.PromptTokensDetails.CachedTokens > 0 {
  762. cached = int64(usage.PromptTokensDetails.CachedTokens)
  763. } else if usage.InputTokensDetails != nil && usage.InputTokensDetails.CachedTokens > 0 {
  764. cached = int64(usage.InputTokensDetails.CachedTokens)
  765. }
  766. pcht := int64(0)
  767. if usage.PromptCacheHitTokens > 0 {
  768. pcht = int64(usage.PromptCacheHitTokens)
  769. }
  770. return cached > 0 || pcht > 0, cached, pcht
  771. }
  772. func usagePromptTokens(usage *dto.Usage) int {
  773. if usage == nil {
  774. return 0
  775. }
  776. if usage.PromptTokens > 0 {
  777. return usage.PromptTokens
  778. }
  779. return usage.InputTokens
  780. }
  781. func usageCompletionTokens(usage *dto.Usage) int {
  782. if usage == nil {
  783. return 0
  784. }
  785. if usage.CompletionTokens > 0 {
  786. return usage.CompletionTokens
  787. }
  788. return usage.OutputTokens
  789. }
  790. func usageTotalTokens(usage *dto.Usage) int {
  791. if usage == nil {
  792. return 0
  793. }
  794. if usage.TotalTokens > 0 {
  795. return usage.TotalTokens
  796. }
  797. pt := usagePromptTokens(usage)
  798. ct := usageCompletionTokens(usage)
  799. if pt > 0 || ct > 0 {
  800. return pt + ct
  801. }
  802. return 0
  803. }
  804. func getChannelAffinityUsageCacheStatsCache() *cachex.HybridCache[ChannelAffinityUsageCacheCounters] {
  805. channelAffinityUsageCacheStatsOnce.Do(func() {
  806. setting := operation_setting.GetChannelAffinitySetting()
  807. capacity := 100_000
  808. defaultTTLSeconds := 3600
  809. if setting != nil {
  810. if setting.MaxEntries > 0 {
  811. capacity = setting.MaxEntries
  812. }
  813. if setting.DefaultTTLSeconds > 0 {
  814. defaultTTLSeconds = setting.DefaultTTLSeconds
  815. }
  816. }
  817. channelAffinityUsageCacheStatsCache = cachex.NewHybridCache[ChannelAffinityUsageCacheCounters](cachex.HybridCacheConfig[ChannelAffinityUsageCacheCounters]{
  818. Namespace: cachex.Namespace(channelAffinityUsageCacheStatsNamespace),
  819. Redis: common.RDB,
  820. RedisEnabled: func() bool {
  821. return common.RedisEnabled && common.RDB != nil
  822. },
  823. RedisCodec: cachex.JSONCodec[ChannelAffinityUsageCacheCounters]{},
  824. Memory: func() *hot.HotCache[string, ChannelAffinityUsageCacheCounters] {
  825. return hot.NewHotCache[string, ChannelAffinityUsageCacheCounters](hot.LRU, capacity).
  826. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  827. WithJanitor().
  828. Build()
  829. },
  830. })
  831. })
  832. return channelAffinityUsageCacheStatsCache
  833. }
  834. func channelAffinityUsageCacheStatsLock(key string) *sync.Mutex {
  835. h := fnv.New32a()
  836. _, _ = h.Write([]byte(key))
  837. idx := h.Sum32() % uint32(len(channelAffinityUsageCacheStatsLocks))
  838. return &channelAffinityUsageCacheStatsLocks[idx]
  839. }