channel_affinity.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950
  1. package service
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/QuantumNous/new-api/common"
  11. "github.com/QuantumNous/new-api/dto"
  12. "github.com/QuantumNous/new-api/pkg/cachex"
  13. "github.com/QuantumNous/new-api/setting/operation_setting"
  14. "github.com/QuantumNous/new-api/types"
  15. "github.com/gin-gonic/gin"
  16. "github.com/samber/hot"
  17. "github.com/tidwall/gjson"
  18. )
  19. const (
  20. ginKeyChannelAffinityCacheKey = "channel_affinity_cache_key"
  21. ginKeyChannelAffinityTTLSeconds = "channel_affinity_ttl_seconds"
  22. ginKeyChannelAffinityMeta = "channel_affinity_meta"
  23. ginKeyChannelAffinityLogInfo = "channel_affinity_log_info"
  24. ginKeyChannelAffinitySkipRetry = "channel_affinity_skip_retry_on_failure"
  25. channelAffinityCacheNamespace = "new-api:channel_affinity:v1"
  26. channelAffinityUsageCacheStatsNamespace = "new-api:channel_affinity_usage_cache_stats:v1"
  27. )
  28. var (
  29. channelAffinityCacheOnce sync.Once
  30. channelAffinityCache *cachex.HybridCache[int]
  31. channelAffinityUsageCacheStatsOnce sync.Once
  32. channelAffinityUsageCacheStatsCache *cachex.HybridCache[ChannelAffinityUsageCacheCounters]
  33. channelAffinityRegexCache sync.Map // map[string]*regexp.Regexp
  34. )
  35. type channelAffinityMeta struct {
  36. CacheKey string
  37. TTLSeconds int
  38. RuleName string
  39. SkipRetry bool
  40. ParamTemplate map[string]interface{}
  41. KeySourceType string
  42. KeySourceKey string
  43. KeySourcePath string
  44. KeyHint string
  45. KeyFingerprint string
  46. UsingGroup string
  47. ModelName string
  48. RequestPath string
  49. }
  50. type ChannelAffinityStatsContext struct {
  51. RuleName string
  52. UsingGroup string
  53. KeyFingerprint string
  54. TTLSeconds int64
  55. }
  56. const (
  57. cacheTokenRateModeCachedOverPrompt = "cached_over_prompt"
  58. cacheTokenRateModeCachedOverPromptPlusCached = "cached_over_prompt_plus_cached"
  59. cacheTokenRateModeMixed = "mixed"
  60. )
  61. type ChannelAffinityCacheStats struct {
  62. Enabled bool `json:"enabled"`
  63. Total int `json:"total"`
  64. Unknown int `json:"unknown"`
  65. ByRuleName map[string]int `json:"by_rule_name"`
  66. CacheCapacity int `json:"cache_capacity"`
  67. CacheAlgo string `json:"cache_algo"`
  68. }
  69. func getChannelAffinityCache() *cachex.HybridCache[int] {
  70. channelAffinityCacheOnce.Do(func() {
  71. setting := operation_setting.GetChannelAffinitySetting()
  72. capacity := setting.MaxEntries
  73. if capacity <= 0 {
  74. capacity = 100_000
  75. }
  76. defaultTTLSeconds := setting.DefaultTTLSeconds
  77. if defaultTTLSeconds <= 0 {
  78. defaultTTLSeconds = 3600
  79. }
  80. channelAffinityCache = cachex.NewHybridCache[int](cachex.HybridCacheConfig[int]{
  81. Namespace: cachex.Namespace(channelAffinityCacheNamespace),
  82. Redis: common.RDB,
  83. RedisEnabled: func() bool {
  84. return common.RedisEnabled && common.RDB != nil
  85. },
  86. RedisCodec: cachex.IntCodec{},
  87. Memory: func() *hot.HotCache[string, int] {
  88. return hot.NewHotCache[string, int](hot.LRU, capacity).
  89. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  90. WithJanitor().
  91. Build()
  92. },
  93. })
  94. })
  95. return channelAffinityCache
  96. }
  97. func GetChannelAffinityCacheStats() ChannelAffinityCacheStats {
  98. setting := operation_setting.GetChannelAffinitySetting()
  99. if setting == nil {
  100. return ChannelAffinityCacheStats{
  101. Enabled: false,
  102. Total: 0,
  103. Unknown: 0,
  104. ByRuleName: map[string]int{},
  105. }
  106. }
  107. cache := getChannelAffinityCache()
  108. mainCap, _ := cache.Capacity()
  109. mainAlgo, _ := cache.Algorithm()
  110. rules := setting.Rules
  111. ruleByName := make(map[string]operation_setting.ChannelAffinityRule, len(rules))
  112. for _, r := range rules {
  113. name := strings.TrimSpace(r.Name)
  114. if name == "" {
  115. continue
  116. }
  117. if !r.IncludeRuleName {
  118. continue
  119. }
  120. ruleByName[name] = r
  121. }
  122. byRuleName := make(map[string]int, len(ruleByName))
  123. for name := range ruleByName {
  124. byRuleName[name] = 0
  125. }
  126. keys, err := cache.Keys()
  127. if err != nil {
  128. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  129. keys = nil
  130. }
  131. total := len(keys)
  132. unknown := 0
  133. for _, k := range keys {
  134. prefix := channelAffinityCacheNamespace + ":"
  135. if !strings.HasPrefix(k, prefix) {
  136. unknown++
  137. continue
  138. }
  139. rest := strings.TrimPrefix(k, prefix)
  140. parts := strings.Split(rest, ":")
  141. if len(parts) < 2 {
  142. unknown++
  143. continue
  144. }
  145. ruleName := parts[0]
  146. rule, ok := ruleByName[ruleName]
  147. if !ok {
  148. unknown++
  149. continue
  150. }
  151. if rule.IncludeUsingGroup {
  152. if len(parts) < 3 {
  153. unknown++
  154. continue
  155. }
  156. }
  157. byRuleName[ruleName]++
  158. }
  159. return ChannelAffinityCacheStats{
  160. Enabled: setting.Enabled,
  161. Total: total,
  162. Unknown: unknown,
  163. ByRuleName: byRuleName,
  164. CacheCapacity: mainCap,
  165. CacheAlgo: mainAlgo,
  166. }
  167. }
  168. func ClearChannelAffinityCacheAll() int {
  169. cache := getChannelAffinityCache()
  170. keys, err := cache.Keys()
  171. if err != nil {
  172. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  173. keys = nil
  174. }
  175. if len(keys) > 0 {
  176. if _, err := cache.DeleteMany(keys); err != nil {
  177. common.SysError(fmt.Sprintf("channel affinity cache delete many failed: err=%v", err))
  178. }
  179. }
  180. return len(keys)
  181. }
  182. func ClearChannelAffinityCacheByRuleName(ruleName string) (int, error) {
  183. ruleName = strings.TrimSpace(ruleName)
  184. if ruleName == "" {
  185. return 0, fmt.Errorf("rule_name 不能为空")
  186. }
  187. setting := operation_setting.GetChannelAffinitySetting()
  188. if setting == nil {
  189. return 0, fmt.Errorf("channel_affinity_setting 未初始化")
  190. }
  191. var matchedRule *operation_setting.ChannelAffinityRule
  192. for i := range setting.Rules {
  193. r := &setting.Rules[i]
  194. if strings.TrimSpace(r.Name) != ruleName {
  195. continue
  196. }
  197. matchedRule = r
  198. break
  199. }
  200. if matchedRule == nil {
  201. return 0, fmt.Errorf("未知规则名称")
  202. }
  203. if !matchedRule.IncludeRuleName {
  204. return 0, fmt.Errorf("该规则未启用 include_rule_name,无法按规则清空缓存")
  205. }
  206. cache := getChannelAffinityCache()
  207. deleted, err := cache.DeleteByPrefix(ruleName)
  208. if err != nil {
  209. return 0, err
  210. }
  211. return deleted, nil
  212. }
  213. func matchAnyRegexCached(patterns []string, s string) bool {
  214. if len(patterns) == 0 || s == "" {
  215. return false
  216. }
  217. for _, pattern := range patterns {
  218. if pattern == "" {
  219. continue
  220. }
  221. re, ok := channelAffinityRegexCache.Load(pattern)
  222. if !ok {
  223. compiled, err := regexp.Compile(pattern)
  224. if err != nil {
  225. continue
  226. }
  227. re = compiled
  228. channelAffinityRegexCache.Store(pattern, re)
  229. }
  230. if re.(*regexp.Regexp).MatchString(s) {
  231. return true
  232. }
  233. }
  234. return false
  235. }
  236. func matchAnyIncludeFold(patterns []string, s string) bool {
  237. if len(patterns) == 0 || s == "" {
  238. return false
  239. }
  240. sLower := strings.ToLower(s)
  241. for _, p := range patterns {
  242. p = strings.TrimSpace(p)
  243. if p == "" {
  244. continue
  245. }
  246. if strings.Contains(sLower, strings.ToLower(p)) {
  247. return true
  248. }
  249. }
  250. return false
  251. }
  252. func extractChannelAffinityValue(c *gin.Context, src operation_setting.ChannelAffinityKeySource) string {
  253. switch src.Type {
  254. case "context_int":
  255. if src.Key == "" {
  256. return ""
  257. }
  258. v := c.GetInt(src.Key)
  259. if v <= 0 {
  260. return ""
  261. }
  262. return strconv.Itoa(v)
  263. case "context_string":
  264. if src.Key == "" {
  265. return ""
  266. }
  267. return strings.TrimSpace(c.GetString(src.Key))
  268. case "gjson":
  269. if src.Path == "" {
  270. return ""
  271. }
  272. storage, err := common.GetBodyStorage(c)
  273. if err != nil {
  274. return ""
  275. }
  276. body, err := storage.Bytes()
  277. if err != nil || len(body) == 0 {
  278. return ""
  279. }
  280. res := gjson.GetBytes(body, src.Path)
  281. if !res.Exists() {
  282. return ""
  283. }
  284. switch res.Type {
  285. case gjson.String, gjson.Number, gjson.True, gjson.False:
  286. return strings.TrimSpace(res.String())
  287. default:
  288. return strings.TrimSpace(res.Raw)
  289. }
  290. default:
  291. return ""
  292. }
  293. }
  294. func buildChannelAffinityCacheKeySuffix(rule operation_setting.ChannelAffinityRule, usingGroup string, affinityValue string) string {
  295. parts := make([]string, 0, 3)
  296. if rule.IncludeRuleName && rule.Name != "" {
  297. parts = append(parts, rule.Name)
  298. }
  299. if rule.IncludeUsingGroup && usingGroup != "" {
  300. parts = append(parts, usingGroup)
  301. }
  302. parts = append(parts, affinityValue)
  303. return strings.Join(parts, ":")
  304. }
  305. func setChannelAffinityContext(c *gin.Context, meta channelAffinityMeta) {
  306. c.Set(ginKeyChannelAffinityCacheKey, meta.CacheKey)
  307. c.Set(ginKeyChannelAffinityTTLSeconds, meta.TTLSeconds)
  308. c.Set(ginKeyChannelAffinityMeta, meta)
  309. }
  310. func getChannelAffinityContext(c *gin.Context) (string, int, bool) {
  311. keyAny, ok := c.Get(ginKeyChannelAffinityCacheKey)
  312. if !ok {
  313. return "", 0, false
  314. }
  315. key, ok := keyAny.(string)
  316. if !ok || key == "" {
  317. return "", 0, false
  318. }
  319. ttlAny, ok := c.Get(ginKeyChannelAffinityTTLSeconds)
  320. if !ok {
  321. return key, 0, true
  322. }
  323. ttlSeconds, _ := ttlAny.(int)
  324. return key, ttlSeconds, true
  325. }
  326. func getChannelAffinityMeta(c *gin.Context) (channelAffinityMeta, bool) {
  327. anyMeta, ok := c.Get(ginKeyChannelAffinityMeta)
  328. if !ok {
  329. return channelAffinityMeta{}, false
  330. }
  331. meta, ok := anyMeta.(channelAffinityMeta)
  332. if !ok {
  333. return channelAffinityMeta{}, false
  334. }
  335. return meta, true
  336. }
  337. func GetChannelAffinityStatsContext(c *gin.Context) (ChannelAffinityStatsContext, bool) {
  338. if c == nil {
  339. return ChannelAffinityStatsContext{}, false
  340. }
  341. meta, ok := getChannelAffinityMeta(c)
  342. if !ok {
  343. return ChannelAffinityStatsContext{}, false
  344. }
  345. ruleName := strings.TrimSpace(meta.RuleName)
  346. keyFp := strings.TrimSpace(meta.KeyFingerprint)
  347. usingGroup := strings.TrimSpace(meta.UsingGroup)
  348. if ruleName == "" || keyFp == "" {
  349. return ChannelAffinityStatsContext{}, false
  350. }
  351. ttlSeconds := int64(meta.TTLSeconds)
  352. if ttlSeconds <= 0 {
  353. return ChannelAffinityStatsContext{}, false
  354. }
  355. return ChannelAffinityStatsContext{
  356. RuleName: ruleName,
  357. UsingGroup: usingGroup,
  358. KeyFingerprint: keyFp,
  359. TTLSeconds: ttlSeconds,
  360. }, true
  361. }
  362. func affinityFingerprint(s string) string {
  363. if s == "" {
  364. return ""
  365. }
  366. hex := common.Sha1([]byte(s))
  367. if len(hex) >= 8 {
  368. return hex[:8]
  369. }
  370. return hex
  371. }
  372. func buildChannelAffinityKeyHint(s string) string {
  373. s = strings.TrimSpace(s)
  374. if s == "" {
  375. return ""
  376. }
  377. s = strings.ReplaceAll(s, "\n", " ")
  378. s = strings.ReplaceAll(s, "\r", " ")
  379. if len(s) <= 12 {
  380. return s
  381. }
  382. return s[:4] + "..." + s[len(s)-4:]
  383. }
  384. func cloneStringAnyMap(src map[string]interface{}) map[string]interface{} {
  385. if len(src) == 0 {
  386. return map[string]interface{}{}
  387. }
  388. dst := make(map[string]interface{}, len(src))
  389. for k, v := range src {
  390. dst[k] = v
  391. }
  392. return dst
  393. }
  394. func mergeChannelOverride(base map[string]interface{}, tpl map[string]interface{}) map[string]interface{} {
  395. if len(base) == 0 && len(tpl) == 0 {
  396. return map[string]interface{}{}
  397. }
  398. if len(tpl) == 0 {
  399. return base
  400. }
  401. out := cloneStringAnyMap(base)
  402. for k, v := range tpl {
  403. if strings.EqualFold(strings.TrimSpace(k), "operations") {
  404. baseOps, hasBaseOps := extractParamOperations(out[k])
  405. tplOps, hasTplOps := extractParamOperations(v)
  406. if hasTplOps {
  407. if hasBaseOps {
  408. out[k] = append(tplOps, baseOps...)
  409. } else {
  410. out[k] = tplOps
  411. }
  412. continue
  413. }
  414. }
  415. if _, exists := out[k]; exists {
  416. continue
  417. }
  418. out[k] = v
  419. }
  420. return out
  421. }
  422. func extractParamOperations(value interface{}) ([]interface{}, bool) {
  423. switch ops := value.(type) {
  424. case []interface{}:
  425. if len(ops) == 0 {
  426. return []interface{}{}, true
  427. }
  428. cloned := make([]interface{}, 0, len(ops))
  429. cloned = append(cloned, ops...)
  430. return cloned, true
  431. case []map[string]interface{}:
  432. cloned := make([]interface{}, 0, len(ops))
  433. for _, op := range ops {
  434. cloned = append(cloned, op)
  435. }
  436. return cloned, true
  437. default:
  438. return nil, false
  439. }
  440. }
  441. func appendChannelAffinityTemplateAdminInfo(c *gin.Context, meta channelAffinityMeta) {
  442. if c == nil {
  443. return
  444. }
  445. if len(meta.ParamTemplate) == 0 {
  446. return
  447. }
  448. templateInfo := map[string]interface{}{
  449. "applied": true,
  450. "rule_name": meta.RuleName,
  451. "param_override_keys": len(meta.ParamTemplate),
  452. }
  453. if anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo); ok {
  454. if info, ok := anyInfo.(map[string]interface{}); ok {
  455. info["override_template"] = templateInfo
  456. c.Set(ginKeyChannelAffinityLogInfo, info)
  457. return
  458. }
  459. }
  460. c.Set(ginKeyChannelAffinityLogInfo, map[string]interface{}{
  461. "reason": meta.RuleName,
  462. "rule_name": meta.RuleName,
  463. "using_group": meta.UsingGroup,
  464. "model": meta.ModelName,
  465. "request_path": meta.RequestPath,
  466. "key_source": meta.KeySourceType,
  467. "key_key": meta.KeySourceKey,
  468. "key_path": meta.KeySourcePath,
  469. "key_hint": meta.KeyHint,
  470. "key_fp": meta.KeyFingerprint,
  471. "override_template": templateInfo,
  472. })
  473. }
  474. // ApplyChannelAffinityOverrideTemplate merges per-rule channel override templates onto the selected channel override config.
  475. func ApplyChannelAffinityOverrideTemplate(c *gin.Context, paramOverride map[string]interface{}) (map[string]interface{}, bool) {
  476. if c == nil {
  477. return paramOverride, false
  478. }
  479. meta, ok := getChannelAffinityMeta(c)
  480. if !ok {
  481. return paramOverride, false
  482. }
  483. if len(meta.ParamTemplate) == 0 {
  484. return paramOverride, false
  485. }
  486. mergedParam := mergeChannelOverride(paramOverride, meta.ParamTemplate)
  487. appendChannelAffinityTemplateAdminInfo(c, meta)
  488. return mergedParam, true
  489. }
  490. func GetPreferredChannelByAffinity(c *gin.Context, modelName string, usingGroup string) (int, bool) {
  491. setting := operation_setting.GetChannelAffinitySetting()
  492. if setting == nil || !setting.Enabled {
  493. return 0, false
  494. }
  495. path := ""
  496. if c != nil && c.Request != nil && c.Request.URL != nil {
  497. path = c.Request.URL.Path
  498. }
  499. userAgent := ""
  500. if c != nil && c.Request != nil {
  501. userAgent = c.Request.UserAgent()
  502. }
  503. for _, rule := range setting.Rules {
  504. if !matchAnyRegexCached(rule.ModelRegex, modelName) {
  505. continue
  506. }
  507. if len(rule.PathRegex) > 0 && !matchAnyRegexCached(rule.PathRegex, path) {
  508. continue
  509. }
  510. if len(rule.UserAgentInclude) > 0 && !matchAnyIncludeFold(rule.UserAgentInclude, userAgent) {
  511. continue
  512. }
  513. var affinityValue string
  514. var usedSource operation_setting.ChannelAffinityKeySource
  515. for _, src := range rule.KeySources {
  516. affinityValue = extractChannelAffinityValue(c, src)
  517. if affinityValue != "" {
  518. usedSource = src
  519. break
  520. }
  521. }
  522. if affinityValue == "" {
  523. continue
  524. }
  525. if rule.ValueRegex != "" && !matchAnyRegexCached([]string{rule.ValueRegex}, affinityValue) {
  526. continue
  527. }
  528. ttlSeconds := rule.TTLSeconds
  529. if ttlSeconds <= 0 {
  530. ttlSeconds = setting.DefaultTTLSeconds
  531. }
  532. cacheKeySuffix := buildChannelAffinityCacheKeySuffix(rule, usingGroup, affinityValue)
  533. cacheKeyFull := channelAffinityCacheNamespace + ":" + cacheKeySuffix
  534. setChannelAffinityContext(c, channelAffinityMeta{
  535. CacheKey: cacheKeyFull,
  536. TTLSeconds: ttlSeconds,
  537. RuleName: rule.Name,
  538. SkipRetry: rule.SkipRetryOnFailure,
  539. ParamTemplate: cloneStringAnyMap(rule.ParamOverrideTemplate),
  540. KeySourceType: strings.TrimSpace(usedSource.Type),
  541. KeySourceKey: strings.TrimSpace(usedSource.Key),
  542. KeySourcePath: strings.TrimSpace(usedSource.Path),
  543. KeyHint: buildChannelAffinityKeyHint(affinityValue),
  544. KeyFingerprint: affinityFingerprint(affinityValue),
  545. UsingGroup: usingGroup,
  546. ModelName: modelName,
  547. RequestPath: path,
  548. })
  549. cache := getChannelAffinityCache()
  550. channelID, found, err := cache.Get(cacheKeySuffix)
  551. if err != nil {
  552. common.SysError(fmt.Sprintf("channel affinity cache get failed: key=%s, err=%v", cacheKeyFull, err))
  553. return 0, false
  554. }
  555. if found {
  556. return channelID, true
  557. }
  558. return 0, false
  559. }
  560. return 0, false
  561. }
  562. func ShouldSkipRetryAfterChannelAffinityFailure(c *gin.Context) bool {
  563. if c == nil {
  564. return false
  565. }
  566. v, ok := c.Get(ginKeyChannelAffinitySkipRetry)
  567. if !ok {
  568. return false
  569. }
  570. b, ok := v.(bool)
  571. if !ok {
  572. return false
  573. }
  574. return b
  575. }
  576. func MarkChannelAffinityUsed(c *gin.Context, selectedGroup string, channelID int) {
  577. if c == nil || channelID <= 0 {
  578. return
  579. }
  580. meta, ok := getChannelAffinityMeta(c)
  581. if !ok {
  582. return
  583. }
  584. c.Set(ginKeyChannelAffinitySkipRetry, meta.SkipRetry)
  585. info := map[string]interface{}{
  586. "reason": meta.RuleName,
  587. "rule_name": meta.RuleName,
  588. "using_group": meta.UsingGroup,
  589. "selected_group": selectedGroup,
  590. "model": meta.ModelName,
  591. "request_path": meta.RequestPath,
  592. "channel_id": channelID,
  593. "key_source": meta.KeySourceType,
  594. "key_key": meta.KeySourceKey,
  595. "key_path": meta.KeySourcePath,
  596. "key_hint": meta.KeyHint,
  597. "key_fp": meta.KeyFingerprint,
  598. }
  599. c.Set(ginKeyChannelAffinityLogInfo, info)
  600. }
  601. func AppendChannelAffinityAdminInfo(c *gin.Context, adminInfo map[string]interface{}) {
  602. if c == nil || adminInfo == nil {
  603. return
  604. }
  605. anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo)
  606. if !ok || anyInfo == nil {
  607. return
  608. }
  609. adminInfo["channel_affinity"] = anyInfo
  610. }
  611. func RecordChannelAffinity(c *gin.Context, channelID int) {
  612. if channelID <= 0 {
  613. return
  614. }
  615. setting := operation_setting.GetChannelAffinitySetting()
  616. if setting == nil || !setting.Enabled {
  617. return
  618. }
  619. if setting.SwitchOnSuccess && c != nil {
  620. if successChannelID := c.GetInt("channel_id"); successChannelID > 0 {
  621. channelID = successChannelID
  622. }
  623. }
  624. cacheKey, ttlSeconds, ok := getChannelAffinityContext(c)
  625. if !ok {
  626. return
  627. }
  628. if ttlSeconds <= 0 {
  629. ttlSeconds = setting.DefaultTTLSeconds
  630. }
  631. if ttlSeconds <= 0 {
  632. ttlSeconds = 3600
  633. }
  634. cache := getChannelAffinityCache()
  635. if err := cache.SetWithTTL(cacheKey, channelID, time.Duration(ttlSeconds)*time.Second); err != nil {
  636. common.SysError(fmt.Sprintf("channel affinity cache set failed: key=%s, err=%v", cacheKey, err))
  637. }
  638. }
  639. type ChannelAffinityUsageCacheStats struct {
  640. RuleName string `json:"rule_name"`
  641. UsingGroup string `json:"using_group"`
  642. KeyFingerprint string `json:"key_fp"`
  643. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  644. Hit int64 `json:"hit"`
  645. Total int64 `json:"total"`
  646. WindowSeconds int64 `json:"window_seconds"`
  647. PromptTokens int64 `json:"prompt_tokens"`
  648. CompletionTokens int64 `json:"completion_tokens"`
  649. TotalTokens int64 `json:"total_tokens"`
  650. CachedTokens int64 `json:"cached_tokens"`
  651. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  652. LastSeenAt int64 `json:"last_seen_at"`
  653. }
  654. type ChannelAffinityUsageCacheCounters struct {
  655. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  656. Hit int64 `json:"hit"`
  657. Total int64 `json:"total"`
  658. WindowSeconds int64 `json:"window_seconds"`
  659. PromptTokens int64 `json:"prompt_tokens"`
  660. CompletionTokens int64 `json:"completion_tokens"`
  661. TotalTokens int64 `json:"total_tokens"`
  662. CachedTokens int64 `json:"cached_tokens"`
  663. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  664. LastSeenAt int64 `json:"last_seen_at"`
  665. }
  666. var channelAffinityUsageCacheStatsLocks [64]sync.Mutex
  667. // ObserveChannelAffinityUsageCacheByRelayFormat records usage cache stats with a stable rate mode derived from relay format.
  668. func ObserveChannelAffinityUsageCacheByRelayFormat(c *gin.Context, usage *dto.Usage, relayFormat types.RelayFormat) {
  669. ObserveChannelAffinityUsageCacheFromContext(c, usage, cachedTokenRateModeByRelayFormat(relayFormat))
  670. }
  671. func ObserveChannelAffinityUsageCacheFromContext(c *gin.Context, usage *dto.Usage, cachedTokenRateMode string) {
  672. statsCtx, ok := GetChannelAffinityStatsContext(c)
  673. if !ok {
  674. return
  675. }
  676. observeChannelAffinityUsageCache(statsCtx, usage, cachedTokenRateMode)
  677. }
  678. func GetChannelAffinityUsageCacheStats(ruleName, usingGroup, keyFp string) ChannelAffinityUsageCacheStats {
  679. ruleName = strings.TrimSpace(ruleName)
  680. usingGroup = strings.TrimSpace(usingGroup)
  681. keyFp = strings.TrimSpace(keyFp)
  682. entryKey := channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp)
  683. if entryKey == "" {
  684. return ChannelAffinityUsageCacheStats{
  685. RuleName: ruleName,
  686. UsingGroup: usingGroup,
  687. KeyFingerprint: keyFp,
  688. }
  689. }
  690. cache := getChannelAffinityUsageCacheStatsCache()
  691. v, found, err := cache.Get(entryKey)
  692. if err != nil || !found {
  693. return ChannelAffinityUsageCacheStats{
  694. RuleName: ruleName,
  695. UsingGroup: usingGroup,
  696. KeyFingerprint: keyFp,
  697. }
  698. }
  699. return ChannelAffinityUsageCacheStats{
  700. CachedTokenRateMode: v.CachedTokenRateMode,
  701. RuleName: ruleName,
  702. UsingGroup: usingGroup,
  703. KeyFingerprint: keyFp,
  704. Hit: v.Hit,
  705. Total: v.Total,
  706. WindowSeconds: v.WindowSeconds,
  707. PromptTokens: v.PromptTokens,
  708. CompletionTokens: v.CompletionTokens,
  709. TotalTokens: v.TotalTokens,
  710. CachedTokens: v.CachedTokens,
  711. PromptCacheHitTokens: v.PromptCacheHitTokens,
  712. LastSeenAt: v.LastSeenAt,
  713. }
  714. }
  715. func observeChannelAffinityUsageCache(statsCtx ChannelAffinityStatsContext, usage *dto.Usage, cachedTokenRateMode string) {
  716. entryKey := channelAffinityUsageCacheEntryKey(statsCtx.RuleName, statsCtx.UsingGroup, statsCtx.KeyFingerprint)
  717. if entryKey == "" {
  718. return
  719. }
  720. windowSeconds := statsCtx.TTLSeconds
  721. if windowSeconds <= 0 {
  722. return
  723. }
  724. cache := getChannelAffinityUsageCacheStatsCache()
  725. ttl := time.Duration(windowSeconds) * time.Second
  726. lock := channelAffinityUsageCacheStatsLock(entryKey)
  727. lock.Lock()
  728. defer lock.Unlock()
  729. prev, found, err := cache.Get(entryKey)
  730. if err != nil {
  731. return
  732. }
  733. next := prev
  734. if !found {
  735. next = ChannelAffinityUsageCacheCounters{}
  736. }
  737. currentMode := normalizeCachedTokenRateMode(cachedTokenRateMode)
  738. if currentMode != "" {
  739. if next.CachedTokenRateMode == "" {
  740. next.CachedTokenRateMode = currentMode
  741. } else if next.CachedTokenRateMode != currentMode && next.CachedTokenRateMode != cacheTokenRateModeMixed {
  742. next.CachedTokenRateMode = cacheTokenRateModeMixed
  743. }
  744. }
  745. next.Total++
  746. hit, cachedTokens, promptCacheHitTokens := usageCacheSignals(usage)
  747. if hit {
  748. next.Hit++
  749. }
  750. next.WindowSeconds = windowSeconds
  751. next.LastSeenAt = time.Now().Unix()
  752. next.CachedTokens += cachedTokens
  753. next.PromptCacheHitTokens += promptCacheHitTokens
  754. next.PromptTokens += int64(usagePromptTokens(usage))
  755. next.CompletionTokens += int64(usageCompletionTokens(usage))
  756. next.TotalTokens += int64(usageTotalTokens(usage))
  757. _ = cache.SetWithTTL(entryKey, next, ttl)
  758. }
  759. func normalizeCachedTokenRateMode(mode string) string {
  760. switch mode {
  761. case cacheTokenRateModeCachedOverPrompt:
  762. return cacheTokenRateModeCachedOverPrompt
  763. case cacheTokenRateModeCachedOverPromptPlusCached:
  764. return cacheTokenRateModeCachedOverPromptPlusCached
  765. case cacheTokenRateModeMixed:
  766. return cacheTokenRateModeMixed
  767. default:
  768. return ""
  769. }
  770. }
  771. func cachedTokenRateModeByRelayFormat(relayFormat types.RelayFormat) string {
  772. switch relayFormat {
  773. case types.RelayFormatOpenAI, types.RelayFormatOpenAIResponses, types.RelayFormatOpenAIResponsesCompaction:
  774. return cacheTokenRateModeCachedOverPrompt
  775. case types.RelayFormatClaude:
  776. return cacheTokenRateModeCachedOverPromptPlusCached
  777. default:
  778. return ""
  779. }
  780. }
  781. func channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp string) string {
  782. ruleName = strings.TrimSpace(ruleName)
  783. usingGroup = strings.TrimSpace(usingGroup)
  784. keyFp = strings.TrimSpace(keyFp)
  785. if ruleName == "" || keyFp == "" {
  786. return ""
  787. }
  788. return ruleName + "\n" + usingGroup + "\n" + keyFp
  789. }
  790. func usageCacheSignals(usage *dto.Usage) (hit bool, cachedTokens int64, promptCacheHitTokens int64) {
  791. if usage == nil {
  792. return false, 0, 0
  793. }
  794. cached := int64(0)
  795. if usage.PromptTokensDetails.CachedTokens > 0 {
  796. cached = int64(usage.PromptTokensDetails.CachedTokens)
  797. } else if usage.InputTokensDetails != nil && usage.InputTokensDetails.CachedTokens > 0 {
  798. cached = int64(usage.InputTokensDetails.CachedTokens)
  799. }
  800. pcht := int64(0)
  801. if usage.PromptCacheHitTokens > 0 {
  802. pcht = int64(usage.PromptCacheHitTokens)
  803. }
  804. return cached > 0 || pcht > 0, cached, pcht
  805. }
  806. func usagePromptTokens(usage *dto.Usage) int {
  807. if usage == nil {
  808. return 0
  809. }
  810. if usage.PromptTokens > 0 {
  811. return usage.PromptTokens
  812. }
  813. return usage.InputTokens
  814. }
  815. func usageCompletionTokens(usage *dto.Usage) int {
  816. if usage == nil {
  817. return 0
  818. }
  819. if usage.CompletionTokens > 0 {
  820. return usage.CompletionTokens
  821. }
  822. return usage.OutputTokens
  823. }
  824. func usageTotalTokens(usage *dto.Usage) int {
  825. if usage == nil {
  826. return 0
  827. }
  828. if usage.TotalTokens > 0 {
  829. return usage.TotalTokens
  830. }
  831. pt := usagePromptTokens(usage)
  832. ct := usageCompletionTokens(usage)
  833. if pt > 0 || ct > 0 {
  834. return pt + ct
  835. }
  836. return 0
  837. }
  838. func getChannelAffinityUsageCacheStatsCache() *cachex.HybridCache[ChannelAffinityUsageCacheCounters] {
  839. channelAffinityUsageCacheStatsOnce.Do(func() {
  840. setting := operation_setting.GetChannelAffinitySetting()
  841. capacity := 100_000
  842. defaultTTLSeconds := 3600
  843. if setting != nil {
  844. if setting.MaxEntries > 0 {
  845. capacity = setting.MaxEntries
  846. }
  847. if setting.DefaultTTLSeconds > 0 {
  848. defaultTTLSeconds = setting.DefaultTTLSeconds
  849. }
  850. }
  851. channelAffinityUsageCacheStatsCache = cachex.NewHybridCache[ChannelAffinityUsageCacheCounters](cachex.HybridCacheConfig[ChannelAffinityUsageCacheCounters]{
  852. Namespace: cachex.Namespace(channelAffinityUsageCacheStatsNamespace),
  853. Redis: common.RDB,
  854. RedisEnabled: func() bool {
  855. return common.RedisEnabled && common.RDB != nil
  856. },
  857. RedisCodec: cachex.JSONCodec[ChannelAffinityUsageCacheCounters]{},
  858. Memory: func() *hot.HotCache[string, ChannelAffinityUsageCacheCounters] {
  859. return hot.NewHotCache[string, ChannelAffinityUsageCacheCounters](hot.LRU, capacity).
  860. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  861. WithJanitor().
  862. Build()
  863. },
  864. })
  865. })
  866. return channelAffinityUsageCacheStatsCache
  867. }
  868. func channelAffinityUsageCacheStatsLock(key string) *sync.Mutex {
  869. h := fnv.New32a()
  870. _, _ = h.Write([]byte(key))
  871. idx := h.Sum32() % uint32(len(channelAffinityUsageCacheStatsLocks))
  872. return &channelAffinityUsageCacheStatsLocks[idx]
  873. }